In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import os, json
from pyspark.sql.functions import *

spark = (
    SparkSession.builder.appName("LofiHipHop")
    .getOrCreate()
)

In [2]:
path_to_json = '../datalake/bronze/lofi-hiphop/'
json_files = [  '../datalake/bronze/lofi-hiphop/' + pos_json for pos_json in os.listdir(path_to_json) 
              if pos_json.endswith('.json')]

df = (
    spark
    .read
    .format("json")
    .load(json_files, multiLine=True)
)

In [3]:
cols = ["author", "messageEx"]
df = (
    df
    .withColumn("badgeUrl", col("author.badgeUrl"))
    .withColumn("channelId", col("author.channelId"))
    .withColumn("channelUrl", col("author.channelUrl"))
    .withColumn("imageUrl", col("author.imageUrl"))
    .withColumn("isChatModerator", col("author.isChatModerator"))
    .withColumn("isChatOwner", col("author.isChatOwner"))
    .withColumn("isChatSponsor", col("author.isChatSponsor"))
    .withColumn("isVerified", col("author.isVerified"))
    .withColumn("name", col("author.name"))
    .withColumn("type", col("author.type"))
    .withColumn("year", year(col("datetime")))
    .withColumn("month", month(col("datetime")))
    .drop(*cols)

)

In [4]:
df.printSchema()

root
 |-- amountString: string (nullable = true)
 |-- amountValue: double (nullable = true)
 |-- bgColor: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- elapsedTime: string (nullable = true)
 |-- id: string (nullable = true)
 |-- message: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- type: string (nullable = true)
 |-- badgeUrl: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelUrl: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- isChatModerator: boolean (nullable = true)
 |-- isChatOwner: boolean (nullable = true)
 |-- isChatSponsor: boolean (nullable = true)
 |-- isVerified: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [5]:
df_pandas = df.toPandas()

In [6]:
import pandas as pd
pd.set_option("display.max_columns", None)

df_pandas.head()

Unnamed: 0,amountString,amountValue,bgColor,currency,datetime,elapsedTime,id,message,timestamp,type,badgeUrl,channelId,channelUrl,imageUrl,isChatModerator,isChatOwner,isChatSponsor,isVerified,name,year,month
0,,0.0,0,,2022-07-07 16:37:42,,CjkKGkNPMnMtY0RENV9nQ0ZWY0kxZ0FkT3JvQTdREhtDSk...,:cheese_wedge::cheese_wedge::cheese_wedge::che...,1657222662608,,,UCsP6RJcL40340EAfiCC5ehg,http://www.youtube.com/channel/UCsP6RJcL40340E...,https://yt4.ggpht.com/ytc/AKedOLQ9tQints7Vmz2u...,False,False,False,False,Bergadapupi,2022,7
1,,0.0,0,,2022-07-07 16:34:26,,CkUKGkNKZi1udVBDNV9nQ0ZTNEkxZ0FkdGRFRVhBEidDSn...,الله اكبر زيد اضرب بيد من حديد,1657222466092,,,UCnL3SbCme0QqcuLmwK5SQ8Q,http://www.youtube.com/channel/UCnL3SbCme0Qqcu...,https://yt4.ggpht.com/_8BIn3TNPkqLab-6ViEsddbk...,False,False,False,False,ted sy,2022,7
2,,0.0,0,,2022-07-07 16:34:30,,CjoKGkNMUGN0LVhDNV9nQ0ZkQUwxZ0FkTnFzSDlBEhxDSU...,Music was so nice :smiling_face_with_halo::rel...,1657222470692,,,UCVDJ3Y-gqC2ro2v71fyRPDA,http://www.youtube.com/channel/UCVDJ3Y-gqC2ro2...,https://yt4.ggpht.com/ytc/AKedOLTh5pj_2JG5eIof...,False,False,False,False,Аббос Рахманов,2022,7
3,,0.0,0,,2022-07-07 16:34:30,,CjoKGkNMUGN0LVhDNV9nQ0ZkQUwxZ0FkTnFzSDlBEhxDSU...,Music was so nice :smiling_face_with_halo::rel...,1657222470692,,,UCVDJ3Y-gqC2ro2v71fyRPDA,http://www.youtube.com/channel/UCVDJ3Y-gqC2ro2...,https://yt4.ggpht.com/ytc/AKedOLTh5pj_2JG5eIof...,False,False,False,False,Аббос Рахманов,2022,7
4,,0.0,0,,2022-07-07 16:34:31,,CkUKGkNMZXZfZVhDNV9nQ0Zab0kxZ0FkdTlVS05nEidDSX...,@Hasan Karabayır o zaman niye eski dost diyors...,1657222471833,,,UCP3nAPGXXbgzrI-jyMYA7bw,http://www.youtube.com/channel/UCP3nAPGXXbgzrI...,https://yt4.ggpht.com/ytc/AKedOLQVBjOzNv0v0R86...,False,False,False,False,Fatih,2022,7


In [7]:
df_pandas.to_parquet(
    '../datalake/silver/lofi-hiphop/data.parquet',
    partition_cols=['year', 'month'])