# Landing Zone to Bronze Layer

In [None]:
import datetime
import numpy as np
from pyspark.sql import types as T 
from pyspark.sql import functions as F

In [None]:
### Getting oAuth for Azure Container
service_credential = dbutils.secrets.get(scope="databricks",key="databricks-test")
#spark.conf.set things here

In [None]:
### Get all file paths in landing zone
y = dbutils.fs.ls(f"abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part")
rdd = spark.sparkContext.parallelize(y)
md = rdd.toDF()

In [None]:
### Organise table of file paths as a DataFrame
md = (md
      .withColumn('year', F.substring('name', 1, 4).cast(T.IntegerType()))
      .withColumn('month', F.substring('name', 6, 2).cast(T.IntegerType()))
      .withColumn('day', F.substring('name', 9, 2).cast(T.IntegerType()))
      .withColumn('hour', F.substring('name', 12, 2).cast(T.IntegerType()))
      .drop('name', 'modificationTime')
      .orderBy('year','month','day','hour')
     )

x = md.display()

path,size,year,month,day,hour
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-0.json.gz,4479147,2017,1,1,0
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-1.json.gz,4336012,2017,1,1,1
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-2.json.gz,4395150,2017,1,1,2
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-3.json.gz,4289567,2017,1,1,3
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-4.json.gz,3929214,2017,1,1,4
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-5.json.gz,3597704,2017,1,1,5
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-6.json.gz,3213933,2017,1,1,6
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-7.json.gz,3697796,2017,1,1,7
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-8.json.gz,3902117,2017,1,1,8
abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part/2017-01-01-9.json.gz,4238152,2017,1,1,9


In [None]:
### From miniro_exploration, we know that parquet files are about double 
# the size of our compressed json, we will use this information
# as well as the size of the json files to select the number of subpartitions(target 128MB)



json_hour_size = []
# for every day
for i in range(len(x)):
    json_hour_size.append(x[i][1])

# Get size of each day
json_day_size = np.sum(np.array(json_hour_size).reshape(-1, 24), axis=1)

parquet_day_size = json_day_size * 2

partition_sizes_max = np.round(parquet_day_size / 128000000, 1)
partition_sizes_min = np.round(parquet_day_size / 400000000, 1)

In [None]:
min(partition_sizes_max)

In [None]:
max(partition_sizes_min)

### 2 and 3 partitions per day would both seem reasonable choices. We pick 3.

In [None]:
### Write data logically partitioned by day and in 3 
# Read in all of the landing zone data
df = spark.read.json(f"abfs://landing-zone@ade20220919.dfs.core.windows.net/gharchive-part")

# Adding day column and write files partitioning by day
df = df.withColumn("day", df.created_at.substr(1,10))
df.repartition(3).write.partitionBy('day').parquet(f"abfs://bronze-layer@ade20220919.dfs.core.windows.net/miniro/2017-01")

In [None]:
### Printing out all distinct event types
types = df.select("type").distinct().collect()
type_list = [types[i][0] for i in range(len(types))]
type_list