#Batch Processing: Spark on Databricks
## Mount AWS S3 bucket to Databricks

Confirm location of authentificationauthentication_credentials.csv

In [None]:
dbutils.fs.ls("/FileStore/tables/")

### Read the csv file containing the AWS keys to Databricks

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Mount the S3 bucket to local workspace

In [None]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0ecf5ea19ac5-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/0ecf5ea19ac5_s3_mount"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

## Confirm that we can read data from the mounted S3 bucket

In [None]:
display(dbutils.fs.ls("/mnt/0ecf5ea19ac5_s3_mount/topics/0ecf5ea19ac5.geo/partition=0/"))

##  Create the following three dataframes: 
## df_pin, df_geo and df_user from S3 data

Re-run from this point after loading fresh data


In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib

topics =  {'df_pin':'0ecf5ea19ac5.pin', 'df_geo':'0ecf5ea19ac5.geo', 'df_user':'0ecf5ea19ac5.user'}
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
for df, topic in topics.items():
    file_location = f"/mnt/0ecf5ea19ac5_s3_mount/topics/{topic}/partition=0/*.json" 
    file_type = "json"
    # Ask Spark to infer the schema
    infer_schema = "true"
    # Read in JSONs from mounted S3 bucket
    df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)
    # Display Spark dataframe to check its content
    if 'pin' in topic:
        df_pin = df
    elif 'geo' in topic:
        df_geo = df
    elif 'user' in topic:
        df_user = df
    display(df)
    

# Data Cleaning

## Data cleaning for df_pin<br>

- Remove duplicate rows in the dataframe
- Rename the column index to ind
- Re-order the column names in the dataframe
- Replace the values of follower_count column wherever necessary and hence converting the column into a integer data type
- Remove any additional strings from the save_location column
- Replace all the NA with None
- Drop the rows where all columns have null values

In [None]:
#df_pin data clean
df_pin = df_pin.dropDuplicates()
df_pin = df_pin.withColumnRenamed('index', 'ind')
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", 
"tag_list", "is_image_or_video", "image_src", "save_location", "category", "downloaded")
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%k]', '000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%M]', '000000'))
df_pin = df_pin.withColumn('follower_count', regexp_replace('follower_count', '[%User Info Error%]', ''))
df_pin = df_pin.withColumn('follower_count', df_pin['follower_count'].cast(IntegerType()))
df_pin = df_pin.withColumn('save_location', regexp_replace('save_location', 'Local save in *', ''))
df_pin.na.fill('None', ['is_image_or_video', 'image_src'])
df_pin.na.drop(how = "all")
display(df_pin)

## Data cleaning for df_geo
- Remove duplicate rows in the dataframe
- Create new column coordinates with the values to be the array of latitude and longitude column and deleting these two columns
- Convert the timestamp column into a timestamp data type
- Re-order the column names in the dataframe

In [None]:
#df_geo data clean
df_geo = df_geo.dropDuplicates()
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
df_geo = df_geo.drop('latitude', 'longitude')
df_geo = df_geo.withColumn("timestamp", df_geo["timestamp"].cast(TimestampType()))
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")
display(df_geo)

## Data cleaning for df_user
- Remove duplicate rows in the dataframe
- Create new column user_name by combining the first_name and last_name column and deleting these two columns
- Convert the date_joined column into a timestamp data type
- Re-order the column names in the dataframe

In [None]:
#df_user data clean
df_user = df_user.dropDuplicates()
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
df_user = df_user.drop("first_name", "last_name")
df_user = df_user.withColumn('date_joined', df_user['date_joined'].cast(TimestampType()))
df_user = df_user.select("ind", "user_name", "age", "date_joined")
display(df_user)

# Pinterest Business Intelligence


1. Create initial setup for subsequent queries.<br>
2. There are two queries per report, one in pyspark SQL and one in regular SQL  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

df_geo.createOrReplaceTempView("geo_table")
df_pin.createOrReplaceTempView("pin_table")
df_user.createOrReplaceTempView("user_table")

1. Find the most popular Pinterest category people post to based on their country


In [None]:
#pyspark sql

# create partition by country and order by category_count descending
windowCountryByCatCount = Window.partitionBy("country").orderBy(col("category_count").desc())

# find the most popular category in each country
df_pin.join(df_geo, df_pin.ind == df_geo.ind)\
    .groupBy("country", "category") \
    .agg(count("category").alias("category_count")) \
    .withColumn("rank", row_number().over(windowCountryByCatCount)) \
    .filter(col("rank") == 1) \
    .drop("rank") \
    .show()


In [None]:
#regular sql

result_df = spark.sql("""
                      
    WITH Ranked as (
    SELECT 
        geo_table.country AS country, 
        pin_table.category AS category, 
        count(pin_table.category) AS category_count,
        ROW_NUMBER() OVER(PARTITION BY geo_table.country ORDER BY count(pin_table.category) DESC) category_rank
    FROM 
        geo_table
    INNER JOIN 
        pin_table ON geo_table.ind = pin_table.ind
    GROUP BY 
        geo_table.country, 
        pin_table.category 
    )   
        SELECT
            country,
            category,
            category_count
        FROM
            Ranked
        WHERE
            category_rank = 1
""")


display(result_df)

2. Which was the most popular category between 2018 and 2022.


In [None]:
#pyspark sql

# create partition by year and order by category_count descending
windowYearByCatCount = Window.partitionBy("post_year").orderBy(col("category_count").desc())

# find which was the most popular category each year between 2018 and 2022
df_pin.join(df_geo, df_pin.ind == df_geo.ind)\
    .withColumn("post_year", year("timestamp")) \
    .filter(col("post_year") >= 2018) \
    .filter(col("post_year") <= 2022) \
    .groupBy("post_year", "category") \
    .agg(count("category").alias("category_count")) \
    .withColumn("rank", row_number().over(windowYearByCatCount)) \
    .filter(col("rank") == 1) \
    .drop("rank") \
    .show()


In [None]:
# regular SQL

result_df = spark.sql("""
                      
WITH Ranked As (
    SELECT DISTINCT 
        YEAR(geo_table.timestamp) AS post_year, 
        pin_table.category AS category, 
        COUNT(pin_table.category) AS category_count,
        ROW_NUMBER() OVER(PARTITION BY YEAR(geo_table.timestamp) ORDER BY COUNT(pin_table.category) DESC) category_rank
    FROM 
        geo_table
    INNER JOIN 
        pin_table ON geo_table.ind = pin_table.ind
    WHERE 
        YEAR(geo_table.timestamp) >= 2018 AND YEAR(geo_table.timestamp) <= 2022
    GROUP BY 
        YEAR(geo_table.timestamp), pin_table.category
) 
    SELECT
        post_year, category, category_count
    FROM
        Ranked
    WHERE
        category_rank == 1
        
""")

display(result_df)
     

3. Step 1: For each country find the user with the most followers.<br>
   Step 2: Based on the above query, find the country with the user with most followers.

In [None]:
#pyspark sql

#Find user with most followers in each country
# create partition by country and order by follower_count descending
windowCountryByFollowers = Window.partitionBy("country").orderBy(col("follower_count").desc())

# find the user with the most followers in each country
max_followers_by_country = \
    df_pin.join(df_geo, df_pin.ind == df_geo.ind) \
    .withColumn("rank", row_number().over(windowCountryByFollowers)) \
    .filter(col("rank") == 1) \
    .select("country", "poster_name", "follower_count") \
    .orderBy(col("country"))

# get highest number of followers from all countries
max_followers_all_countries = max_followers_by_country.select(max("follower_count")).collect()[0][0]

# find the country with the user with most followers
country_with_max_followers = \
    max_followers_by_country \
    .select("country","follower_count") \
    .orderBy(col("follower_count").desc()) \
    .limit(1)

max_followers_by_country.show()
country_with_max_followers.show()


In [None]:
# regular SQL

# STEP 1
result_df = spark.sql("""
                      
WITH Ranked AS (
    
    SELECT 
        pin_table.poster_name AS poster,
        geo_table.country AS country,
        pin_table.follower_count AS follower_count,
        ROW_NUMBER() OVER (PARTITION BY geo_table.country ORDER BY pin_table.follower_count DESC) AS rank
    FROM
        geo_table
    INNER JOIN pin_table ON geo_table.ind = pin_table.ind
)

    SELECT
        country, poster, follower_count
    FROM
        Ranked
    WHERE
        rank = 1
    ORDER BY 
        country;
        
""")

display(result_df)              

In [None]:
# Use SQL to join DataFrames: STEP 2
result_df = spark.sql("""
                      
WITH Ranked AS (
    
    SELECT 
        pin_table.poster_name AS poster,
        geo_table.country AS country,
        pin_table.follower_count AS follower_count,
        ROW_NUMBER() OVER (PARTITION BY geo_table.country ORDER BY pin_table.follower_count DESC) AS rank
    FROM
        geo_table
    INNER JOIN pin_table ON geo_table.ind = pin_table.ind
)

    SELECT
        country, follower_count
    FROM
        Ranked
    WHERE
        rank = 1
    ORDER BY 
        follower_count DESC  
    LIMIT 1
    
""")

display(result_df)       

4. Find the most popular category people post to, based on the age groups - 18-24, 25-35, 36-50, +50

In [None]:
#pyspark sql

#Define age groups
pin_user_age_group =\
df_pin.join(df_user, 'ind') \
    .withColumn('age_group', expr("""case
    when age BETWEEN 18 AND 24 THEN '18-24'
    when age BETWEEN 25 AND 35 THEN '25-35'
    when age BETWEEN 36 AND 50 THEN '36-50'
    when age> 50 THEN '+50'
    END
    """))

# create partition by age_group and order by category_count descending
windowAgeGroup = Window.partitionBy("age_group").orderBy(col("category_count").desc())

# find the most popular category for different age groups
pin_user_age_group.groupBy("category","age_group") \
    .agg(count("category").alias("category_count")) \
    .withColumn("rank", row_number().over(windowAgeGroup)) \
    .filter(col("rank") == 1) \
    .drop("rank") \
    .show()
    

In [None]:
# regular SQL

result_df = spark.sql("""
WITH Ranked AS (
              
    SELECT 
        pin_table.category as category,
        CASE
            WHEN user_table.age BETWEEN 18 AND 24 THEN '18-24'
            when user_table.age BETWEEN 25 AND 35 THEN '25-35'
            when user_table.age BETWEEN 36 AND 50 THEN '36-50'
            when user_table.age > 50 THEN '+50'
        END as age_group,
        COUNT(pin_table.category) AS category_count
    FROM
        pin_table
    INNER JOIN 
        user_table ON pin_table.ind = user_table.ind   
    GROUP BY
        pin_table.category, age_group
    ORDER BY
        age_group, category_count DESC
        
),

Ranked_Window as (
    
    SELECT
        category, age_group, category_count,
        ROW_NUMBER() OVER (PARTITION BY age_group ORDER BY category_count DESC) AS rank
    FROM
        Ranked
)

    SELECT 
        category, age_group, category_count
    FROM
        Ranked_Window
    WHERE
        rank = 1
    
""")

display(result_df) 

5. Find the median follower count for users in the age groups, 18-24, 25-35, 36-50, +50

In [None]:
#pyspark sql

#Find the median follower count for different age groups
pin_user_age_group \
    .select("age_group", "follower_count") \
    .groupBy("age_group") \
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .orderBy("age_group") \
    .show()
    

In [None]:
#Standard SQL

result_df = spark.sql("""
                      
WITH  CTE AS 
(
    
    SELECT 
        CASE
            WHEN user_table.age BETWEEN 18 AND 24 THEN '18-24'
            when user_table.age BETWEEN 25 AND 35 THEN '25-35'
            when user_table.age BETWEEN 36 AND 50 THEN '36-50'
            when user_table.age > 50 THEN '+50'
        END as age_group,
        pin_table.follower_count as follower_count
    FROM
        pin_table
    INNER JOIN 
        user_table ON pin_table.ind = user_table.ind
    WHERE
        pin_table.follower_count IS NOT NULL
    ORDER BY
        age_group
        
), 
    
CTE2 AS (
    SELECT 
        CTE.age_group, CTE.follower_count,
        NTILE(2) OVER(PARTITION BY CTE.age_group ORDER BY CTE.follower_count) as half1, 
        NTILE(2) OVER(PARTITION BY CTE.age_group ORDER BY CTE.follower_count DESC) as half2
    FROM
        CTE
)
    
    SELECT  CTE2.age_group,
            ROUND((MAX(CASE WHEN CTE2.half1 = 1 THEN CTE2.follower_count END) + 
            MIN(CASE WHEN CTE2.half2 = 1 THEN CTE2.follower_count END)) / 2.0) as median_follower_count
    FROM    CTE2
    GROUP BY CTE2.age_group;
    
""")

display(result_df)

6. Find how many users have joined between 2015 and 2020.

In [None]:
#pyspark sql

df_user\
    .groupBy(year('date_joined').alias('post_year'))\
    .agg(count_distinct('ind').alias('number_users_joined')) \
    .select('post_year', 'number_users_joined')\
    .where(col('post_year').between('2015', '2020'))\
    .show()
    

In [None]:
# standard sql

result_df = spark.sql("""

    SELECT DISTINCT
        YEAR(date_joined) as post_year,
        COUNT (ind) as number_of_users_joined
    FROM
        user_table
    WHERE 
        YEAR(date_joined) BETWEEN 2015 AND 2020
    GROUP BY
        post_year
    ORDER BY
        post_year

""")

display(result_df)

7. Find the median follower count of users who have joined between 2015 and 2020.

In [None]:
#pyspark sql

df_user.join(df_pin, 'ind')\
    .groupBy(year('date_joined').alias('post_year'))\
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .select('post_year', 'median_follower_count')\
    .where(col('post_year').between('2015', '2020'))\
    .show()
    

In [None]:
# standard sql

result_df = spark.sql("""

WITH CTE AS (        
    SELECT 
        YEAR(user_table.date_joined) as post_year,
        pin_table.follower_count as follower_count
    FROM
        user_table
    INNER JOIN 
        pin_table ON user_table.ind = pin_table.ind
    WHERE 
        YEAR(user_table.date_joined) BETWEEN 2015 AND 2020
    ORDER BY
        post_year
),
    
CTE2 AS (
    SELECT 
        CTE.post_year, CTE.follower_count,
        NTILE(2) OVER(PARTITION BY CTE.post_year ORDER BY CTE.follower_count) as half1, 
        NTILE(2) OVER(PARTITION BY CTE.post_year ORDER BY CTE.follower_count DESC) as half2
    FROM
        CTE
)
    
    SELECT  CTE2.post_year,
            ROUND((MAX(CASE WHEN CTE2.half1 = 1 THEN CTE2.follower_count END) + 
            MIN(CASE WHEN CTE2.half2 = 1 THEN CTE2.follower_count END)) / 2.0) as median_follower_count
    FROM    CTE2
    GROUP BY CTE2.post_year;    
    
""")

display(result_df)

8. Find the median follower count of users who have joined between 2015 and 2020, based on age group that they are part of.

In [None]:
#pyspark sql

df_pin.join(df_user, 'ind')\
    .withColumn('age_group', expr("""case
        when age BETWEEN 18 AND 24 THEN '18-24'
        when age BETWEEN 25 AND 35 THEN '25-35'
        when age BETWEEN 36 AND 50 THEN '36-50'
        when age> 50 THEN '+50'
        END
        """))\
    .groupBy(year('date_joined').alias('post_year'), 'age_group')\
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .select('post_year', 'age_group','median_follower_count')\
    .where(col('post_year').between('2015', '2020'))\
    .orderBy('post_year','age_group')\
    .show()  


In [None]:
#standard sql

result_df = spark.sql("""
    WITH CTE AS (
        SELECT 
            YEAR(user_table.date_joined) as post_year,
            CASE
                WHEN user_table.age BETWEEN 18 AND 24 THEN '18-24'
                when user_table.age BETWEEN 25 AND 35 THEN '25-35'
                when user_table.age BETWEEN 36 AND 50 THEN '36-50'
                when user_table.age > 50 THEN '+50'
            END as age_group,
            pin_table.follower_count as follower_count
        FROM
            pin_table
        INNER JOIN 
            user_table ON pin_table.ind = user_table.ind
        WHERE 
            YEAR(user_table.date_joined) BETWEEN 2015 AND 2020 
        ORDER BY
            post_year, age_group
     ),
    
    CTE2 AS (
        SELECT 
            CTE.post_year, CTE.age_group, CTE.follower_count,
            NTILE(2) OVER(PARTITION BY CTE.post_year, CTE.age_group  ORDER BY CTE.follower_count) as half1, 
            NTILE(2) OVER(PARTITION BY CTE.post_year, CTE.age_group ORDER BY CTE.follower_count DESC) as half2
        FROM
            CTE
    )
    
    SELECT  CTE2.post_year, CTE2.age_group, 
            ROUND((MAX(CASE WHEN CTE2.half1 = 1 THEN CTE2.follower_count END) + 
            MIN(CASE WHEN CTE2.half2 = 1 THEN CTE2.follower_count END)) / 2.0) as median_follower_count
    FROM    CTE2
    GROUP BY CTE2.post_year, CTE2.age_group; 
    
""")

display(result_df)