In [None]:
%%pyspark

#Read all JSON files in your data lake.
df = spark.read.load('abfss://datalakefs@mydatalake.dfs.core.windows.net/GoogleAnalytics/raw/*', format='json', multiLine=True)

#Provide the schema of the JSON files
df.printSchema()

#Shows number of files loaded based upon the count.
df.count()

In [None]:
#Display entries in the dataframe
display(df)

In [None]:
#Gather high level data, some of which is still metadata but scrubbed out the row entries and column mappings as we do that later.
df_single = df.select(
    "containsSampledData",
    "id",
    "itemsPerPage",
    "kind",
    "nextLink",
    "profileInfo.*",
    "query.*",
    "sampleSize",
    "sampleSpace",
    "selfLink",
    "totalResults",
    "totalsForAllResults.*"
);

display(df_single);

In [None]:
#'Flatten' out nested arrays in the rows column and map the names of the columns based upon the columnHeaders column data
#This example helped demonstrate the column mappings from the columnHeaders in the JSON file to the rows column
#https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/753971180331031/2636709537891264/8469274372247319/latest.html

column_names = map(lambda x: x['name'], df.select("columnHeaders").collect()[0][0])
rows = df.select("rows").collect()

row_list= []
for row in rows:
  for item in row[0]:
    row_list.append(item)

rows_rdd = sc.parallelize(row_list)
df_rows = spark.createDataFrame(rows_rdd, list(column_names), .01)
df_rows.count()

In [None]:
display(df_rows)

In [None]:
#Write to datalake as Parquet in YYYY/mm/dd/HHMMSS directory structure with name of parquest as either the summary data or individual

import datetime

date = datetime.datetime.now()

df_rows.write.parquet("abfss://datalakefs@mydatalake.dfs.core.windows.net/GoogleAnalytics/transformed/%s/%s/%s/%s_%s" % (date.strftime("%Y"), date.strftime("%m"), date.strftime("%d"), date.strftime("%H%M%S"), "ga_sessions"))
df_rows.write.parquet("abfss://datalakefs@mydatalake.dfs.core.windows.net/GoogleAnalytics/transformed/%s/%s/%s/%s_%s" % (date.strftime("%Y"), date.strftime("%m"), date.strftime("%d"), date.strftime("%H%M%S"), "ga_sessions_summary"))
