In [None]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
import pyspark
# URL processing
import urllib
import pyspark.pandas as pd

## Read the credentials and prepare to connect to S3 Bucket

In [None]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
display(aws_keys_df)

## Connect and Mount S3 Bucket to Databricks File System

In [None]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0a1d8948160f-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/pin_pipe"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive

mount_points = (mount.mountPoint for mount in dbutils.fs.mounts())

if any(MOUNT_NAME in mount for mount in mount_points):
    print(f'{AWS_S3_BUCKET} is already mounted at {MOUNT_NAME}')
else:
    print('Mouting the S3 Paritions')
    dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)


In [None]:
pin_data_location = f"{MOUNT_NAME}/topics/0a1d8948160f.pin/partition=0/"
geo_data_location = f"{MOUNT_NAME}/topics/0a1d8948160f.geo/partition=0/"
user_data_location = f"{MOUNT_NAME}/topics/0a1d8948160f.user/partition=0/"
print(len(dbutils.fs.ls(pin_data_location)))
print(len(dbutils.fs.ls(geo_data_location)))
print(len(dbutils.fs.ls(user_data_location)))


## Read JSON data files for all pinterest posts data

In [None]:
def prepare_dataframe(data_location):
    data_files = dbutils.fs.ls(data_location)
    # df = spark.read\
    #     .format('json')\
    #     .option("inferSchema", True)\
    #     .load([x.path for x in data_files])
    df = spark.read.json(data_location + '/*.json')
    return df

In [None]:
### Read All Data for pinterest posts from S3
df_pin = prepare_dataframe(pin_data_location)
df_pin.show(5)

In [None]:
# pin_dfs = pd.DataFrame(df_pin)

## Clean Pin Spark DataFrame

Replace empty entries and entries with no relevant data in each column with `Nones` \
Perform the necessary transformations on the follower_count to ensure every entry is a number. \
Make sure the data type of this column is an `int`. \
Ensure that each column containing numeric data has a numeric data type \
Clean the data in the `save_location` column to include only the save location path \
Rename the `index` column to `ind`. \
Reorder the `DataFrame` columns to have the following column order: \
    `ind` \
    `unique_id` \
    `title` \
    `description` \
    `follower_count` \
    `poster_name` \
    `tag_list` \
    `is_image_or_video` \
    `image_src` \
    `save_location` \
    `category`

### Make sure unique ids are correct format

In [None]:
# uuid_regex = r'[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}'
# pin_dfs = pin_dfs[pin_dfs['unique_id'].str.match(uuid_regex)]
# len(pin_dfs)
df_pin = df_pin.filter(length(df_pin.unique_id) == 36)

### Cleanup and transform `follower_count`

#### 1. Remove rows with invalid follower count

In [None]:
follower_regex = r'[0-9]{1,}[kM]?'
# pin_dfs = pin_dfs[pin_dfs['follower_count'].str.match(follower_regex)]
# len(pin_dfs)
df_pin = df_pin.filter(df_pin.follower_count.rlike(follower_regex))
df_pin.count()

#### 2. Convert *kilo* and *Million* to numeric

In [None]:
def transform_followers_count(x) -> int:
    muliplier = 1
    if x.endswith('k'):
        return 1000 * int(x[:-1])
    elif x.endswith('M'):
        return 1000000 * int(x[:-1])
    else:
        return int(x)

# pin_dfs['follower_count'] = pin_dfs['follower_count'].apply(transform_followers_count)
transform_followers_udf = udf(transform_followers_count, IntegerType())
df_pin = df_pin.withColumn('follower_count', transform_followers_udf('follower_count'))

### Rename index column to `ind`

In [None]:
# pin_dfs.rename(columns={'index': 'ind'}, inplace=True)
df_pin = df_pin.withColumnRenamed('index', 'ind')

### Cleanup the `save_location` columns to keep only the path

In [None]:
# pin_dfs['save_location'] = pin_dfs['save_location'].str.replace('Local save in ', '', regex=False)
df_pin = df_pin.withColumn('save_location', regexp_replace('save_location', 'Local save in ', ''))

### Reorder the columns

In [None]:
# pin_dfs = pin_dfs[['ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name',
#                    'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category']]
df_pin = df_pin[['ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name',
                   'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category']]

### Replace empty and not applicable with `None`

In [None]:
# pin_dfs = pin_dfs.replace(['', 'N/A', 'n/a', 'none', 'None'], None)
df_pin = df_pin.replace(['', 'N/A', 'n/a', 'none', 'None'], None)

### Finally, drop duplicate rows

In [None]:
# pin_dfs = pin_dfs.drop_duplicates()
df_pin = df_pin.drop_duplicates()

## Read JSON data files for all geolocation data

In [None]:
df_geo = prepare_dataframe(geo_data_location)
display(df_geo)

In [None]:
geo_dfs = pd.DataFrame(df_geo)

## Clean Geo Spark DataFrame
To clean the df_geo DataFrame you should perform the following transformations:

Create a new column `coordinates` that contains an array based on the `latitude` and `longitude` columns.\
Drop the `latitude` and `longitude` columns from the `DataFrame`. \
Convert the `timestamp` column from a string to a `timestamp` data type. \
Reorder the `DataFrame` columns to have the following column order:\
  `ind`\
  `country`\
  `coordinates`\
  `timestamp`

### Creates new column `coordinates` from `latitude` and `longitude` and drops them after creation

In [None]:
df_geo = df_geo.withColumn('coordinates', array(df_geo.latitude, df_geo.longitude)).drop('latitude','longitude')

### Convert `timestamp` column to `timestamp` data type

In [None]:
df_geo = df_geo.withColumn('timestamp', to_timestamp(df_geo.timestamp))

### Reorder the columns as:
`['ind', 'country', 'coordinates', 'timestamp']`

In [None]:
df_geo = df_geo[['ind', 'country', 'coordinates', 'timestamp']]

In [None]:
df_geo.columns

## Read JSON data files for all user data

In [None]:
df_user = prepare_dataframe(user_data_location)
df_user.columns

In [None]:
df_user.show(10)

## Clean User Spark DataFrame
To clean the df_user DataFrame you should perform the following transformations:

Create a new column `user_name` that concatenates the information found in the `first_name` and `last_name` columns. \
Drop the `first_name` and `last_name` columns from the DataFrame. \
Convert the `date_joined` column from a `string` to a `timestamp` data type. \
Reorder the `DataFrame` columns to have the following column order: \
  `ind`, 
  `user_name`, 
  `age`, 
  `date_joined`


In [None]:
df_user = df_user.withColumn('user_name', concat(df_user.first_name, df_user.last_name)).drop('first_name', 'last_name')

In [None]:
df_user = df_user.withColumn('date_joined', to_timestamp(df_user.date_joined))

In [None]:
df_user = df_user[['ind', 'user_name', 'age', 'date_joined']]

## Analytics Queries

In [None]:
print('df_pin: \n', df_pin.columns)
print('df_geo: \n', df_geo.columns)
print('df_user: \n', df_user.columns)

### M7-T4 Find the most popular Pinterest category people post to based on their country.

Your query should return a DataFrame that contains the following columns:
- country
- category
- category_count, a new column containing the desired query output

In [None]:
# First we inner join two dataframes on index of the post
# Count the posts grouped by country and category to get
# a table showing total number of posts per category for each country
joined = df_pin.join(df_geo, 'ind') \
    .groupBy('country', 'category') \
    .agg(count('*').alias('category_count')) \
    .sort('country', 'category', 'category_count', ascending=False) \
# display(joined)
# Now we need to only pick top category for each country therefore
# we partition the joined dataframe it by country using a Windowing function 
# and then sort within each partition by number of posts for each category
# Using this, we create a new column in joined dataframe which has this
# sorted and partitioned results from highest to lowest leading to highest number of posts
# to row 1 for each partition
window = pyspark.sql.Window.partitionBy('country').orderBy(desc('category_count'))
ranked = joined.withColumn('rank', row_number().over(window))
# display(ranked)
# Finally, we filter the results picking only the row 1 for each country
top_category_per_country = ranked.filter(col('rank')==1).select('country', 'category', 'category_count')
display(top_category_per_country)

### M7-T5 Find most popular category each year
Find how many posts each category had between 2018 and 2022.

Your query should return a DataFrame that contains the following columns:

- `post_year`, a new column that contains only the year from the timestamp column
- `category`
- `category_count`, a new column containing the desired query output


In [None]:
# First we join the dataframes containing posts and geolocation data on index
# Grouping by the category
joined = df_pin.join(
    df_geo.filter(df_geo.timestamp > lit('2017')).filter(df_geo.timestamp < lit('2023')), 'ind'
    ).groupBy(year('timestamp').alias('post_year'), 'category')\
    .agg(count('*').alias('category_count')).orderBy('post_year', 'category_count', ascending=False)
display(joined)

### M7-T6 User with most followers and the country

#### Step 1: For each country find the user with the most followers.

Your query should return a DataFrame that contains the following columns:

`country`
`poster_name`
`follower_count`

In [None]:
# The query has same pattern as previous M7-T5
joined = df_pin.join(df_geo, 'ind').groupby('poster_name', 'country').agg(sum('follower_count').alias('follower_count'))
window = pyspark.sql.Window.partitionBy('country').orderBy(desc('follower_count'))
ranked = joined.withColumn('rank', row_number().over(window))
top_user_by_follower_per_country = ranked.filter(col('rank')==1).select('country', 'poster_name', 'follower_count')

In [None]:
display(top_user_by_follower_per_country)

#### Step 2: Based on the above query, find the country with the user with most followers.

Your query should return a DataFrame that contains the following columns:

`country`
`follower_count`

This DataFrame should have only one entry.

In [None]:
user_country_most_followers = top_user_by_follower_per_country.select('country', 'follower_count')\
    .orderBy('follower_count', ascending=False)

In [None]:
display(user_country_most_followers.limit(1))

### M7-T7 Most popular category for each age group
#### What is the most popular category people post to based on the following age groups:

* 18-24
* 25-35
* 36-50
* +50

Your query should return a DataFrame that contains the following columns:\
`age_group`, a new column based on the original age column \
`category` \
`category_count`, a new column containing the desired query output


In [None]:
# Drop the columns not required to try increasing the speed
joined = df_pin.join(df_user, 'ind')
        # .drop('unique_id','title','description','follower_count','poster_name','tag_list',
        #                                    'is_image_or_video', 'image_src', 'save_location','user_name', 'date_joined')


In [None]:
age_demongraphics_categorical = joined.withColumn('age_group', 
                  when(joined.age.between(18, 24), lit('18-24')).otherwise(
                      when(joined.age.between(25, 35), lit('25-35')).otherwise(
                          when(joined.age.between(36, 50), lit('36-50')).otherwise(lit('50+'))
                      )
                  )
).groupBy('age_group', 'category').agg(count('*').alias('category_count'))

In [None]:
window = pyspark.sql.Window.partitionBy('age_group').orderBy(desc('category_count'))
ranked = age_demongraphics_categorical.withColumn('rank', row_number().over(window))
top_category_by_age = ranked.filter(col('rank')==1).select('age_group', 'category', 'category_count')

In [None]:
display(top_category_by_age)

### M7-T8 Median Follower Count by Age Group
What is the median follower count for users in the following age groups:

* 18-24
* 25-35
* 36-50
* +50

Your query should return a DataFrame that contains the following columns:
- `age_group`, a new column based on the original `age` column
- `median_follower_count`, a new column containing the desired query output


In [None]:
joined = df_pin.join(df_user, 'ind')
followers_by_age = joined.withColumn('age_group', 
                when(joined.age.between(18, 24), lit('18-24')
                     ).otherwise(
                        when(joined.age.between(25, 35), lit('25-35')
                             ).otherwise(
                                when(joined.age.between(36, 50), lit('36-50')).otherwise(lit('50+'))
                            )
                    )
).groupBy('age_group').agg(percentile_approx('follower_count', 0.5).alias('median_follower_count')).orderBy('age_group')

In [None]:
display(followers_by_age)

### M7-T9 Users Joining Per Year 2015 - 2020
Find how many users have joined between 2015 and 2020.

Your query should return a DataFrame that contains the following columns:

* `post_year`, a new column that contains only the year from the timestamp column
* `number_users_joined`, a new column containing the desired query output


In [None]:
users_per_year = df_user.withColumn('post_year', year('date_joined'))\
    .groupBy('post_year') \
    .agg(count('user_name').alias('number_users_joined'))


In [None]:
display(users_per_year)

### M7-T10 Median Follower Count of Users based On Joining Year
Find the median follower count of users have joined between 2015 and 2020.
Your query should return a DataFrame that contains the following columns:
- `post_year`, a new column that contains only the year from the timestamp column
- `median_follower_count`, a new column containing the desired query output


In [None]:
joined = df_pin.join(df_user, 'ind')\
    .withColumn('post_year', year('date_joined'))\
    .groupBy('post_year')\
    .agg(percentile_approx('follower_count', 0.5).alias('median_follower_count'))
    
filtered = joined.filter(
    joined.post_year.between(lit('2015'), lit('2020'))
)

In [None]:
display(filtered)

### M7-T11 The median follower count of users based on their joining year and age group
Find the median follower count of users that have joined between 2015 and 2020, based on which age group they are part of.\
Your query should return a DataFrame that contains the following columns:

- `age_group`, a new column based on the original `age` column
    * 18-24
    * 25-35
    * 36-50
    * +50
- `post_year`, a new column that contains only the year from the timestamp column
- `median_follower_count`, a new column containing the desired query output


In [None]:
joined = df_pin.join(df_user, 'ind').withColumn('post_year', year('date_joined'))
followers_by_age_by_joined_year = joined.withColumn('age_group', 
                when(joined.age.between(18, 24), lit('18-24')
                     ).otherwise(
                        when(joined.age.between(25, 35), lit('25-35')
                             ).otherwise(
                                when(joined.age.between(36, 50), lit('36-50')).otherwise(lit('50+'))
                            )
                    )
).groupBy('age_group', 'post_year')\
  .agg(percentile_approx('follower_count', 0.5).alias('median_follower_count'))\
    .orderBy('age_group')

In [None]:
display(followers_by_age_by_joined_year)

## Unmount S3 Bucket

In [None]:
dbutils.fs.unmount(MOUNT_NAME)