Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [None]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | GitHub Activity'). \
    master('yarn'). \
    getOrCreate()

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

In [None]:
def from_files(spark, data_dir, file_pattern, file_format):
    df = spark.read. \
        format(file_format). \
        load(f'{data_dir}/{file_pattern}')
    return df

In [None]:
from pyspark.sql.functions import year, \
    month, dayofmonth


def transform(df):
    df.createOrReplaceTempView('ghactivity')
    df_transformed = spark.sql("""
        SELECT g.*,
            year(created_at) AS year,
            month(created_at) AS month,
            day(created_at) AS day
        FROM ghactivity AS g
    """)
    return df_transformed

In [None]:
%%sh

hdfs dfs -ls /public/gharchive

In [None]:
df = from_files(spark, '/public/gharchive', '2021-01-14-*', 'json')

In [None]:
df.printSchema()

In [None]:
df.select('repo.*', 'created_at').show()

In [None]:
df_transformed = transform(df)

In [None]:
df_transformed.printSchema()

In [None]:
df_transformed = df_transformed.select('repo.*', 'created_at', 'year', 'month', 'day')

In [None]:
df_transformed.show()

In [None]:
df_transformed.printSchema()

In [None]:
df_transformed.createOrReplaceTempView('ghactivity_transformed')

In [None]:
df_transformed.write.

In [None]:
spark.sql(f'CREATE DATABASE {username}_github')

In [None]:
spark.sql(f'''
CREATE TABLE {username}_github.itv_ghactivity (
    id BIGINT,
    name STRING,
    url STRING,
    created_at STRING
) PARTITIONED BY (year INT, month INT, day INT)
STORED AS parquet
''')

In [None]:
spark.sql('set hive.exec.dynamic.partition.mode=nonstrict')

In [None]:
spark.sql(f'''
    INSERT INTO TABLE {username}_github.itv_ghactivity
    SELECT * FROM ghactivity_transformed
''')

In [None]:
spark.sql(f'SELECT * FROM {username}_github.itv_ghactivity').show()

In [None]:
df1 = spark.read.table(f'{username}_github.itv_ghactivity')

In [None]:
df1.show()

In [None]:
spark.sql(f'DESCRIBE FORMATTED {username}_github.itv_ghactivity').show(200, truncate=False)

In [None]:
spark.sql(f'SHOW PARTITIONS {username}_github.itv_ghactivity').show(truncate=False)

In [None]:
spark.sql(f'''
    SELECT to_date(created_at) AS created_at,
        count(1) AS activity_count
    FROM {username}_github.itv_ghactivity
    GROUP BY to_date(created_at)
'''). \
    show()

In [None]:
def to_files(df, tgt_dir, file_format):
    df.coalesce(16). \
        write. \
        partitionBy('year', 'month', 'day'). \
        mode('append'). \
        format(file_format). \
        save(tgt_dir)

In [None]:
df = from_files(spark, '/public/gharchive', '2021-01-15-*', 'json')

In [None]:
df_transformed = transform(df)

In [None]:
df_transformed = df_transformed.select('repo.*', 'created_at', 'year', 'month', 'day')

In [None]:
to_files(
    df_transformed, 
    f'hdfs://nn01.itversity.com:8020/user/{username}/warehouse/{username}github.db/itv_ghactivity',
    'parquet'
)

In [None]:
spark.sql(f'''
    SELECT to_date(created_at) AS created_at,
        count(1) AS activity_count
    FROM {username}_github.itv_ghactivity
    GROUP BY to_date(created_at)
'''). \
    show()

In [None]:
%%sh

hdfs dfs -ls hdfs://nn01.itversity.com:8020/user/${USER}/warehouse/${USER}_github.db/itv_ghactivity/year=2021/month=1

In [None]:
spark.sql(f'SHOW PARTITIONS {username}_github.itv_ghactivity').show(truncate=False)

In [None]:
spark.sql(f'MSCK REPAIR TABLE {username}_github.itv_ghactivity')

In [None]:
spark.sql(f'SHOW PARTITIONS {username}_github.itv_ghactivity').show(truncate=False)