In [0]:
def get_data_by_topic(topic_ext):
    file_location = f"/mnt/0a2bc878981f-s3-data/0a2bc878981f.{topic_ext}/partition=0/*.json" 
    file_type = "json"
    infer_schema = "true"

    df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)
    return df

topic_data = {topic_ext: get_data_by_topic(topic_ext) for topic_ext in ('pin', 'geo', 'user')}

In [0]:
import pyspark.pandas as ps

In [0]:
## 'Pin' data

In [0]:
pin_data = topic_data['pin']
pin_data.printSchema()

In [0]:
pin_df = pin_data.to_pandas_on_spark()

In [0]:
pin_df.head()

In [0]:
pin_df.info()

In [0]:
 pin_df['category'].unique()

In [0]:
pin_df['category'] = pin_df['category'].astype('str')

In [0]:
pin_df['description'].unique()

In [0]:
pin_df['description'] = pin_df['description'].str.strip().str.replace('No description available$|No description available Story format|Untitled', 'None', regex=True)

In [0]:
pin_df['description'] = pin_df['description'].astype('str')

In [0]:
pin_df['downloaded'].unique()

In [0]:
pin_df['downloaded'] = pin_df['downloaded'].astype('int')

In [0]:
non_integer_follower_counts = pin_df[pin_df['follower_count'].str.contains('[^0-9]+')]	
non_integer_follower_counts

In [0]:
invalid_follower_counts = pin_df[~pin_df['follower_count'].str.contains('[0-9]+')]	

In [0]:
pin_df = ps.concat([pin_df, invalid_follower_counts]).drop_duplicates(keep=False)

In [0]:
pin_df['follower_count'] = pin_df['follower_count'].str.replace('k', '000').str.replace('M', '000000')

In [0]:
pin_df[~pin_df['follower_count'].str.contains('[0-9]+')]	

In [0]:
pin_df['follower_count'] = pin_df['follower_count'].astype('int')

In [0]:
pin_df[~pin_df['image_src'].str.contains('^https')]

In [0]:
pin_df['image_src'] = pin_df['image_src'].str.replace('Image src error.','None')

In [0]:
pin_df['image_src'] = pin_df['image_src'].astype('str')

In [0]:
pin_df = pin_df.rename(columns={'index': 'ind'})

In [0]:
pin_df['is_image_or_video'].unique()

In [0]:
pin_df['is_image_or_video'] = pin_df['is_image_or_video'].astype('str')

In [0]:
pin_df['poster_name'].unique()

In [0]:
pin_df['poster_name'] = pin_df['poster_name'].str.replace('User Info Error.','None')

In [0]:
pin_df['poster_name'] = pin_df['poster_name'].astype('str')

In [0]:
pin_df['save_location'].unique()

In [0]:
pin_df['save_location'] = pin_df['save_location'].str.strip().str.replace('Local save in', '')

In [0]:
pin_df['postersave_location_name'] = pin_df['save_location'].astype('str')

In [0]:
pin_df['tag_list'].unique()

In [0]:
pin_df['tag_list'] = pin_df['tag_list'].str.replace('N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e','None')

In [0]:
pin_df['tag_list'] = pin_df['tag_list'].astype('str')

In [0]:
pin_df['title'].unique()

In [0]:
pin_df['title'] = pin_df['title'].str.replace('No Title Data Available','None')

In [0]:
pin_df['title'] = pin_df['title'].astype('str')

In [0]:
pin_df['unique_id'] = pin_df['unique_id'].astype('str')

In [0]:
pin_df = pin_df.reindex(columns=['ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category'])

In [0]:
## 'Geo' data

In [0]:
geo_data = topic_data['geo']
geo_data.printSchema()

In [0]:
geo_df = geo_data.to_pandas_on_spark()

In [0]:
geo_df.head()

In [0]:
geo_df.info()

In [0]:
geo_df['coordinates'] = geo_df['latitude'].astype('str') + ',' + geo_df['longitude'].astype('str')
geo_df['coordinates'] = geo_df['coordinates'].str.split(',')

In [0]:
geo_df['coordinates']

In [0]:
geo_df = geo_df.drop(['latitude', 'longitude'], axis=1)

In [0]:
geo_df['timestamp'] = ps.to_datetime(geo_df['timestamp'])

In [0]:
geo_df = geo_df.reindex(columns=['ind', 'country', 'coordinates', 'timestamp'])

In [0]:
geo_df['ind'] = geo_df['ind'].astype('int')

In [0]:
geo_df['country'] = geo_df['country'].astype('str')

In [0]:
## Users

In [0]:
user_data = topic_data['user']
user_data.printSchema()

In [0]:
user_df = user_data.to_pandas_on_spark()

In [0]:
user_df.head()

In [0]:
user_df.info()

In [0]:
user_df['user_name'] = user_df['first_name'] + ' ' + user_df['last_name']

In [0]:
user_df = user_df.drop(['first_name', 'last_name'], axis=1)

In [0]:
user_df['date_joined'] = ps.to_datetime(user_df['date_joined']) 

In [0]:
user_df = user_df.reindex(columns=['ind', 'user_name', 'age', 'date_joined'])