In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [0]:
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

directory_to_check = "/mnt/gold"
mounted_directories = [mount.mountPoint for mount in dbutils.fs.mounts()]

if directory_to_check in mounted_directories:
    print(f"The directory {directory_to_check} is already mounted.")
else:
    dbutils.fs.mount(
    source = "abfss://gold@youtubeadlsg2.dfs.core.windows.net/",
    mount_point = "/mnt/gold",
    extra_configs = configs)

The directory /mnt/gold is already mounted.


In [0]:
df = spark.read.format('delta').load('/mnt/gold/data/')

In [0]:
df.show(5)

+----+----------------+-----------+---------------+----------------+----------------+-------+-------------+------------+-------------+----------------+------------+-----------------+--------------------------------+-----------------------+------------------------+----------------------+-----------------------+----------------------------+------------+-------------+------------+-----------------------------------+-------------+-----------------+----------------+---------+----------+--------------------+-------------------+--------------------+-------------------+--------------------+------------------------+
|rank|        Youtuber|subscribers|    video_views|        category|           Title|uploads|      Country|Abbreviation| channel_type|video_views_rank|country_rank|channel_type_rank|video_views_for_the_last_30_days|lowest_monthly_earnings|highest_monthly_earnings|lowest_yearly_earnings|highest_yearly_earnings|subscribers_for_last_30_days|created_year|created_month|created_date|Gross

In [0]:
df = df.withColumn("subscriber_to_view_ratio", col("subscribers") / col("video_views"))

In [0]:
string_indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex")
encoder = OneHotEncoder(inputCol="CountryIndex", outputCol="CountryOneHot")

pipeline = Pipeline(stages=[string_indexer, encoder])
model = pipeline.fit(df)
df_encoded = model.transform(df)


In [0]:
indexer = StringIndexer(inputCol="Country", outputCol="CountryEncoded")
df_encoded = indexer.fit(df).transform(df)


In [0]:
df_encoded.select('country', 'countryEncoded').show(5)

+-------------+--------------+
|      country|countryEncoded|
+-------------+--------------+
|United States|           0.0|
|United States|           0.0|
|          nan|           2.0|
|United States|           0.0|
|        India|           1.0|
+-------------+--------------+
only showing top 5 rows



In [0]:
output_path = '/mnt/gold/data/final/'
df_encoded.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save(output_path)


In [0]:
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

directory_to_check = "/mnt/final"
mounted_directories = [mount.mountPoint for mount in dbutils.fs.mounts()]

if directory_to_check in mounted_directories:
    print(f"The directory {directory_to_check} is already mounted.")
else:
    dbutils.fs.mount(
    source = "abfss://final@youtubeadlsg2.dfs.core.windows.net/",
    mount_point = "/mnt/final",
    extra_configs = configs)

In [0]:
from delta.tables import DeltaTable

# Create a DeltaTable instance
delta_table = DeltaTable.forPath(spark, "/mnt/gold/data/final/")

# Read the Delta table into a DataFrame
delta_df = delta_table.toDF()

# Define the output path for the CSV file
csv_output_path = '/mnt/final/data/csv_data/'

# Write the DataFrame as a CSV file
delta_df.coalesce(1).write.format('csv').mode('overwrite').option('header', 'true').save(csv_output_path)

