## Prepare table in raw layer

Let us prepare table in raw layer. This will act as golden copy of our source data.
* The files in the landing folder are in JSON Format.
* There is no partitioning strategy while copying the files in landing zone.
* Parquet file is preferred over other file formats while storing the data in Data Lake storage layer (HDFS in this case).
* It is generally preferred to use daily partitions for the data to process further in incremental fashion.

Here are the steps we are going to follow. For now, we will take care of this manually but we need to automate and orchestrate later.
* Make sure files are available in landing zone.
* Read the data from the JSON files in the landing zone and create a Dataframe.
* Add additional columns as per the partitioning strategy. We are going to partition by year, then month, then day using one of date fields in input data.
* Partition the data frame by year, month and day and then write to the target table in the **{username}_ghraw_db**
* Make sure data is accessible using Spark SQL queries.

In [None]:
%%sh

hdfs dfs -ls /user/${USER}/itvgithub/landing

In [1]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Analyze GitHub Archive Data'). \
    master('yarn'). \
    getOrCreate()

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 8)

In [None]:
process_dt = '2021-01-13'
ghdata = spark. \
    read. \
    json(f'/user/{username}/itv-github/landing/{process_dt}-*.json.gz')

In [None]:
from pyspark.sql.functions import substring, col

In [None]:
spark.sql(f'CREATE DATABASE IF NOT EXISTS {username}_ghraw_db')

In [None]:
spark.sql(f'DROP TABLE IF EXISTS {username}_ghraw_db.ghactivity')

In [None]:
%%sh

hdfs dfs -ls /user/${USER}/warehouse/${USER}_ghraw_db.db

In [None]:
%%sh

hdfs dfs -rm -R -skipTrash /user/${USER}/warehouse/${USER}_ghraw_db.db/ghactivity

In [None]:
ghdata.printSchema()

In [None]:
ghdata. \
    withColumn('year', substring('created_at', 1, 4)). \
    withColumn('month', substring('created_at', 6, 2)). \
    withColumn('day', substring('created_at', 9, 2)). \
    select('repo.*', 'actor.*', 'org.*', 'created_at', 'year', 'month', 'day'). \
    show()

In [None]:
ghdata = ghdata. \
    withColumn('year', substring('created_at', 1, 4)). \
    withColumn('month', substring('created_at', 6, 2)). \
    withColumn('day', substring('created_at', 9, 2))

In [None]:
ghdata. \
    write. \
    partitionBy('year', 'month', 'day'). \
    saveAsTable(f'{username}_ghraw_db.ghactivity')

In [None]:
spark.sql(f'SHOW PARTITIONS {username}_ghraw_db.ghactivity').show(truncate=False)

In [None]:
%%sh

hdfs dfs -ls -R /user/${USER}/warehouse/${USER}_ghraw_db.db/ghactivity

In [None]:
spark.sql(f'''
    SELECT substring(created_at, 1, 10) AS created_dt, count(1)
    FROM {username}_raw.ghactivity
    GROUP BY created_dt
    ORDER BY created_dt
'''). \
    show()