In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *
from pyspark.sql import DataFrame

In [0]:
def load_dataframe(filepath: str):
    '''
    Loads in json objects from S3 bucket into dataframe:

    Params: filepath -> Location of topic within S3 Bucket
    Returns: df -> Dataframe of combined json objects for topic
    '''
    file_type = "json"
    infer_schema = "true"
    df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(filepath)
    return df

pin = load_dataframe('/mnt/paaaaaat/topics/12baff1ff207.pin/partition=0/*.json')
geo = load_dataframe('/mnt/paaaaaat/topics/12baff1ff207.geo/partition=0/*.json')
user = load_dataframe('/mnt/paaaaaat/topics/12baff1ff207.user/partition=0/*.json')
# display(pin)
print(type(pin))


In [0]:
print(type(pin))

desc: No description available
desc: No description available Story format
follower_count, poster_name: User Info Error
tag_list: N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e
title: No Title Data Available
image_src: Image src error.


In [0]:
def clean_pin_df(pin_df: DataFrame):
    misc_rows = ['No description available', 'No description available Story format', 'User Info Error'
                 'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e', 'No Title Data Available', 'Image src error.']
    print(type(pin_df))
    for col_ite in pin_df.columns:
        pin_df = pin_df.withColumn(col_ite, when(col(col_ite).isin(misc_rows), None).otherwise(col(col_ite)))
    display(pin_df)

    pin_df = pin_df.withColumn('follower_count',
                               when(col('follower_count').contains('k'), (regexp_replace(col('follower_count'), 'k', '000'))\
                                        .cast('int'))\
                                        .when(col('follower_count').contains('M'), (regexp_replace(col('follower_count'), 'M', '000000'))\
                                            .cast('int'))\
                                            .otherwise(col("follower_count").cast("int")))
    # Ensure numeric data type for columns below
    pin_df = pin_df.withColumn("downloaded", col("downloaded").cast("int"))
    pin_df = pin_df.withColumn("index", col("index").cast("int"))

    # Include only location path
    pin_df = pin_df.withColumn("save_location", regexp_replace(col("save_location"), "Local save in ", ""))

    # Rename the index column to ind
    pin_df = pin_df.withColumnRenamed("index", "ind")

    # Reorder the DataFrame columns
    column_order = [
        "ind",
        "unique_id",
        "title",
        "description",
        "follower_count",
        "poster_name",
        "tag_list",
        "is_image_or_video",
        "image_src",
        "save_location",
        "category"
    ]
    pin_df = pin_df.select(column_order)
    # display(pin_df)
    print(type(pin_df))
    return pin_df

pin_clean = clean_pin_df(pin)


In [0]:
def clean_df_geo(df_geo: DataFrame) -> DataFrame:
    # Create new column with values from latitude and longitude in array
    df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

    df_geo = df_geo.drop("latitude","longitude")

    df_geo = df_geo.withColumn("timestamp",to_timestamp(col("timestamp")))

    df_geo = df_geo.withColumn("ind", col("ind").cast("int"))

    column_order = ["ind", "country", "coordinates", "timestamp"]
    df_geo = df_geo.select(column_order)
    display(df_geo)
    print(type(df_geo))
    return df_geo

geo_clean = clean_df_geo(geo)

In [0]:
def clean_df_user(df_user: DataFrame) -> DataFrame:

    df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
    df_user = df_user.drop("first_name", "last_name")

    df_user = df_user.withColumn("date_joined", to_timestamp(col("date_joined")))

    df_user = df_user.withColumn("ind", col("ind").cast("int"))
    df_user = df_user.withColumn("age", col("age").cast("int"))

    column_order = ["ind", "user_name", "age", "date_joined"]
    df_user = df_user.select(column_order)
    display(df_user)
    print(type(df_user))
    return df_user

user_clean = clean_df_user(user)

In [0]:
pin_clean.write.format("parquet").mode("overwrite").saveAsTable("pin_clean")
geo_clean.write.format("parquet").mode("overwrite").saveAsTable("geo_clean")
user_clean.write.format("parquet").mode("overwrite").saveAsTable("user_clean")