# **Connect to Azure Data Lake Storage Gen2**




In [None]:
service_credential = dbutils.secrets.get(scope="mxh-key",key="mxh-secret")
spark.conf.set("fs.azure.account.auth.type.mxh.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.mxh.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.mxh.dfs.core.windows.net", "8b09a262-1ab7-4a30-a982-dfab6de4b3fc")
spark.conf.set("fs.azure.account.oauth2.client.secret.mxh.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.mxh.dfs.core.windows.net", "https://login.microsoftonline.com/2dff09ac-2b3b-4182-9953-2b548e0d0b39/oauth2/token")

# **Create 4 dataframes for 4 partitions**

In [None]:
# Read 4 partition files and create 4 dataframes
df1 = spark.read.format("csv").load("abfss://output@mxh.dfs.core.windows.net/facebook_partition_1.csv")
df2 = spark.read.format("csv").load("abfss://output@mxh.dfs.core.windows.net/facebook_partition_2.csv")
df3 = spark.read.format("csv").load("abfss://output@mxh.dfs.core.windows.net/facebook_partition_3.csv")
df4 = spark.read.format("csv").load("abfss://output@mxh.dfs.core.windows.net/facebook_partition_4.csv")

In [None]:
# Union 4 dataframes above
df_facebook = df1.union(df2).union(df3).union(df4)
# Count the number of rows in df_facebook
df_facebook.count()

88234

# **Install module 'faker' và import libraries**

In [None]:
# Install module 'faker'
!pip install faker

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting faker
  Downloading Faker-20.1.0-py3-none-any.whl (1.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.7/1.7 MB 23.7 MB/s eta 0:00:00
Installing collected packages: faker
Successfully installed faker-20.1.0
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
# Import libraries
from pyspark import SparkContext, SparkConf
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit, col, when, row_number
from pyspark.sql.window import Window
from faker import Faker

# **Functions to randomly generate name and age**

In [None]:
# Function random name and cast to dataframe
def create_df_name(faker):
	random_names = [faker.name() for _ in range(4039)]
	df_name = spark.createDataFrame([(name,) for name in random_names], ["UserName"])
	return df_name


# Function random age and cast to dataframe
def create_df_age(faker):
    # Number of rows
    num_records = 4039

    # Random Ages
    random_ages = [faker.random_int(min=18, max=25) for _ in range(num_records)]

    # Create DataFrame with a single column "UserAge"
    df_age = spark.createDataFrame([(age,) for age in random_ages], ["UserAge"])

    return df_age

# **Functions to map data suitably between FromUser and ToUser**


In [None]:
# Function for change column name
def change_column_name(dataframe, column_0, column_1):
	df_temp = dataframe.withColumnRenamed('_c0', column_0)
	df = df_temp.withColumnRenamed('_c1', column_1)
	return df

# Function for change column name after join
def change_column_join(dataframe, column_0, column_1):
	df_temp = dataframe.withColumnRenamed('UserName', column_0)
	df = df_temp.withColumnRenamed('UserAge', column_1)
	return df


# Match Data FromUser
def match_data_userfrom(df_facebook_graph, df_name, df_age):
	# Cast FromUser to Int
	df_facebook_graph = df_facebook_graph.withColumn("FromUser", df_facebook_graph["FromUser"].cast("int"))

# Join DataFrame df_facebook_graph with df_name for information FromUser
	df_facebook_graph = df_facebook_graph.join(df_name, df_facebook_graph["FromUser"] == df_name["Id"], "left").drop("Id")

# Join DataFrame df_facebook_graph with df_age for information FromUser
	df_facebook_graph = df_facebook_graph.join(df_age, df_facebook_graph["FromUser"] == df_age["Id"], "left").drop("Id")

	return df_facebook_graph

# Match Data ToUser
def match_data_userto(df_facebook_graph, df_name, df_age):
	# Cast FromUser to Int
	df_facebook_graph = df_facebook_graph.withColumn("ToUser", df_facebook_graph["ToUser"].cast("int"))

	# Join DataFrame df_facebook_graph with df_name for information FromUser
	df_facebook_graph = df_facebook_graph.join(df_name, df_facebook_graph["ToUser"] == df_name["Id"], "left").drop("Id")

	# Join DataFrame df_facebook_graph with df_age for information FromUser
	df_facebook_graph = df_facebook_graph.join(df_age, df_facebook_graph["ToUser"] == df_age["Id"], "left").drop("Id")

	return df_facebook_graph

# Create dataframe df_facebook_graph
def join_dataframe(df_facebook_graph, df_name, df_age):
	# Make an identiy
	w = Window().orderBy(lit('A'))

	# Create an identity for df_name, df_age
	df_name = df_name.withColumn('Id', row_number().over(w) - 1)
	df_age = df_age.withColumn('Id', row_number().over(w) - 1)

	# Mapping data for df_facebook_graph with UserFrom
	df_facebook_graph = match_data_userfrom(df_facebook_graph, df_name, df_age)
	# Change column name
	column0 = 'UserNameFrom'
	column1 = 'UserAgeFrom'
	df_facebook_graph = change_column_join(df_facebook_graph, column0, column1)


	# Mapping data for df_facebook_graph with UserFrom
	df_facebook_graph = match_data_userto(df_facebook_graph, df_name, df_age)
	# Change column name
	column0 = 'UserNameTo'
	column1 = 'UserAgeTo'
	df_facebook_graph = change_column_join(df_facebook_graph, column0, column1)


	return df_facebook_graph



# **Using functions above to create the final data file**

In [None]:
# Change column
column1 = "FromUser"
column2 = "ToUser"
df_facebook = change_column_name(df_facebook, column1, column2)

# Create object faker
faker = Faker()
df_name = create_df_name(faker)
df_age = create_df_age(faker)

# Join and Match data
df_facebook = join_dataframe(df_facebook, df_name, df_age)

# Show result
df_facebook.show(100)

+--------+------+---------------+-----------+--------------------+---------+
|FromUser|ToUser|   UserNameFrom|UserAgeFrom|          UserNameTo|UserAgeTo|
+--------+------+---------------+-----------+--------------------+---------+
|       0|     1|Laura Frederick|         21|      Robert Maxwell|       24|
|       0|     2|Laura Frederick|         21|       Sharon Murray|       21|
|       0|     3|Laura Frederick|         21|        Brian Martin|       23|
|       0|     4|Laura Frederick|         21|      Larry Townsend|       24|
|       0|     5|Laura Frederick|         21|         Johnny Kidd|       25|
|       0|     6|Laura Frederick|         21|     Heather Hawkins|       25|
|       0|     7|Laura Frederick|         21|     Brittney Wilson|       19|
|       0|     8|Laura Frederick|         21|          Nicole May|       23|
|       0|     9|Laura Frederick|         21|      Tara Mcconnell|       23|
|       0|    10|Laura Frederick|         21|      Michelle Hines|       23|

# **Load processed data into Storage (container "processed")**




### **Option 1**

In [None]:
# Load 4 processed partition files into Data Storage
df_facebook.write.option("header", True).mode("overwrite").csv("abfss://processed@mxh.dfs.core.windows.net/facebook_processed")

### **Option 2**

In [None]:
# Load 1 processed partition file into Data Storage
df_facebook.coalesce(1).write.option("header", True).mode("overwrite").csv("abfss://processed@mxh.dfs.core.windows.net/facebook_processed")