In [0]:
dbutils.secrets.listScopes()

In [0]:
df=dbutils.secrets.get(scope="youtube-db-scope",key="youtube-key-vault")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "d1df4878-b934-4df1-8357-06dd1a8ee374",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="youtube-db-scope",key="youtube-key-vault"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/fdc798aa-6f15-4e72-bf4a-6c813690fc75/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://youtube-data-analysis@adlsgen2youtube.dfs.core.windows.net/",
  mount_point = "/mnt/youtube-analysis-data",
  extra_configs = configs)

In [0]:
%fs
ls /mnt/youtube-analysis-data

In [0]:
%fs
ls dbfs:/mnt/youtube-analysis-data/transformed-data/statistics-reference-data/

In [0]:
statisticsReferenceDF=spark.read.format('parquet').load('/mnt/youtube-analysis-data/transformed-data/statistics-reference-data/*')
display(statisticsReferenceDF)

In [0]:
from pyspark.sql.functions import input_file_name, regexp_extract


In [0]:
statisticsReferenceDF=statisticsReferenceDF.withColumn('FileName', input_file_name())
display(statisticsReferenceDF)

In [0]:
statisticsReferenceDF=statisticsReferenceDF.withColumn('FileName', regexp_extract("FileName", "([^/]+$)", 0))
display(statisticsReferenceDF)

In [0]:
from pyspark.sql.functions import substring
statisticsReferenceDF=statisticsReferenceDF.withColumn('CountryCode', substring('FileName',1,2))
display(statisticsReferenceDF)

In [0]:
from pyspark.sql.functions import split
statisticsReferenceDF=statisticsReferenceDF.withColumn('Platform', split('kind','#')[0]).withColumn('Category',split('kind','#')[1])
display(statisticsReferenceDF)

In [0]:
statisticsReferenceDF=statisticsReferenceDF.drop('kind')
display(statisticsReferenceDF)

In [0]:
from pyspark.sql.functions import col
statisticsReferenceDF=statisticsReferenceDF.withColumn('id',col('id').cast('int'))
display(statisticsReferenceDF)

In [0]:
from pyspark.sql.functions import regexp_replace
statisticsReferenceDF=statisticsReferenceDF.withColumn('etag', regexp_replace('etag', '"', '')).withColumn('category', regexp_replace('category', 'Category', ''))
display(statisticsReferenceDF)

In [0]:
statisticsReferenceDataset=statisticsReferenceDF.select('CountryCode','id','title','category','Platform')
display(statisticsReferenceDataset)

In [0]:
statisticsDF=spark.read.format('parquet').load('/mnt/youtube-analysis-data/transformed-data/raw-statistics/')
display(statisticsDF) 

In [0]:
from pyspark.sql.functions import to_date, col
statisticsUSDF=statisticsUSDF.withColumn('trending_date', to_date(col('trending_date'), 'yy.dd.MM'))

In [0]:
from pyspark.sql.functions import date_format,to_timestamp
statisticsUSDF=statisticsUSDF.withColumn('published_time', to_timestamp(to_timestamp(col('publish_time'), 'HH:mm:ss')))\
    .withColumn('publish_date', to_date(col('publish_time')))
display(statisticsUSDF)

In [0]:
from pyspark.sql.functions import upper,col
statisticsUSDF=statisticsUSDF.withColumn('views', col('views').cast('int'))\
    .withColumn('likes', col('likes').cast('int'))\
    .withColumn('dislikes', col('dislikes').cast('int'))\
    .withColumn('comment_count', col('comment_count').cast('int'))\
    .withColumn('category_id', col('category_id').cast('int'))\
    .withColumn('region', upper('region'))

display(statisticsUSDF)

In [0]:
from pyspark.sql.functions import datediff,when
statisticsUSDF=statisticsUSDF.withColumn(
        "time_period",
        when((col("published_time") >= 5) & (col("published_time") < 9), "Early Morning")
        .when((col("published_time") >= 9) & (col("published_time") < 12), "Morning")
        .when((col("published_time") >= 12) & (col("published_time") < 13), "Noon")
        .when((col("published_time") >= 13) & (col("published_time") < 17), "Afternoon")
        .when((col("published_time") >= 17) & (col("published_time") < 20), "Evening")
        .when((col("published_time") >= 20) & (col("published_time") <= 23), "Night")
        .otherwise("Late Night")
    )\
    .withColumn("publish_week_of_month", date_format(col("publish_date"), "d").cast("int")) \
    .withColumn("trend_week_of_month", date_format(col("trending_date"), "d").cast("int")) \
    .withColumn(
        "publish_week_of_month",
        when((col("publish_week_of_month") <= 7), 1)
        .when((col("publish_week_of_month") >= 8) & (col("publish_week_of_month") <= 14), 2)
        .when((col("publish_week_of_month") >= 15) & (col("publish_week_of_month") <= 21), 3)
        .when((col("publish_week_of_month") >= 22) & (col("publish_week_of_month") <= 28), 4)
        .otherwise(5)  # For days 29-31
    )\
    .withColumn(
        "trend_week_of_month",
        when((col("trend_week_of_month") <= 7), 1)
        .when((col("trend_week_of_month") >= 8) & (col("trend_week_of_month") <= 14), 2)
        .when((col("trend_week_of_month") >= 15) & (col("trend_week_of_month") <= 21), 3)
        .when((col("trend_week_of_month") >= 22) & (col("trend_week_of_month") <= 28), 4)
        .otherwise(5)  # For days 29-31
    )\
    .withColumn('trending_IN', datediff(col('trending_date'), col('publish_date')))
display(statisticsUSDF)

In [0]:
statisticsUSDF=statisticsUSDF.withColumn('published_time', col('published_time').cast('timestamp')

In [0]:
statisticsDF=statisticsUSDF.select('region','category_id','title','channel_title','views','likes','dislikes','comment_count','comments_disabled','ratings_disabled','video_error_or_removed','publish_date','trending_date','published_time','time_period','publish_week_of_month','trend_week_of_month','trending_IN','tags','description')

In [0]:
display(statisticsDF)