## PySpark Test script to collate blobs of JSON together

This is in order to collate together separate JSON scripts that are accepted in and stored via the inbound API
the inbound API will store json locally, and clean itself on a scheduled basis
The point of this exercise is to gather inputs A B and C together, combine them, and store them in a separate location

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import glob
import os
import pandas as pd

In [2]:
spark = SparkSession.Builder().master('local[*]')\
    .appName("inboundcollation")\
    .getOrCreate()

In [3]:
inboundDir = '../data/aggregate/'

jsonSchema = StructType([
    StructField("title", StringType(), False),
    StructField("author", StringType(), True),
    StructField("project", StringType(), False),
    StructField("date_published", StringType(), True),
    StructField("lead_image_url", StringType(), True),
    StructField("content", StringType(), False),
    StructField("next_page_url", StringType(), True),
    StructField("url", StringType(), False),
    StructField("domain", StringType(), True),
    StructField("excerpt", StringType(), True),
    StructField("word_count", IntegerType(), False),
    StructField("direction", StringType(), True),
    StructField("total_pages", IntegerType(), True),
    StructField("rendered_pages", IntegerType(), True),
])


In [26]:
df_app = spark.read.schema(jsonSchema).json("../data/aggregate/inputA.json", multiLine=True)

In [27]:
df_app.describe

<bound method DataFrame.describe of DataFrame[title: string, author: string, date_published: string, lead_image_url: string, content: string, next_page_url: string, url: string, domain: string, excerpt: string, word_count: int, direction: string, total_pages: int, rendered_pages: int]>

In [14]:
# UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows this occurs because winUtils isn't installed - winutils simulates a Hadoop installation, look into adding this and setting your %HADOOP_HOME path to this
# this is the only way to be able to run pySpark jobs, which gives me pause for the project
# potential resolutions may be to do this via a container

In [29]:
pandas = df_app.toPandas()
pandas.head()

Unnamed: 0,title,author,date_published,lead_image_url,content,next_page_url,url,domain,excerpt,word_count,direction,total_pages,rendered_pages
0,Application Performance Monitoring AWS Lambda ...,Matt Makai,2019-05-21T16:21:43.081Z,https://www.fullstackpython.com/img/headers/py...,\nAmazon Web Services (AWS) Lambda is a usage-...,,https://www.fullstackpython.com/blog/applicati...,fullstackpython,Learn how to use Sentry Application Performanc...,1113,ltr,1,1


In [4]:
# new plan - I have to use both to get what I need
all_files = []
for root, dirs, files in os.walk('../data/aggregate'):
    files = glob.glob(os.path.join(root,'*.json'))
    for f in files:
        all_files.append(os.path.abspath(f))

In [23]:
print(all_files)

['x:\\bloganalysis\\bloganalysis\\data\\aggregate\\inputA.json', 'x:\\bloganalysis\\bloganalysis\\data\\aggregate\\inputB.json', 'x:\\bloganalysis\\bloganalysis\\data\\aggregate\\inputC.json']


In [5]:
def create_empty_dataframe():
    index = pd.Index([], name="id", dtype=int)
    # specify column name and data type 
    columns = [('title', str),
               ('author', str),
               ('project', str),
               ('date_published', str),
               ('lead_image_url', str),
               ('content', str),
               ('next_page_url', str),
               ('url', str),
               ('domain', str),
               ('excerpt', str),
               ('word_count', int),
               ('direction', str),
               ('total_pages', int),
               ('rendered_pages', int)]
    # create the dataframe from a dict
    return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns})

emptyDF = create_empty_dataframe()
emptyDF.head()

Unnamed: 0,title,author,project,date_published,lead_image_url,content,next_page_url,url,domain,excerpt,word_count,direction,total_pages,rendered_pages


In [12]:


for file in all_files:
    df_app = spark.read.schema(jsonSchema).json(file, multiLine=True)
    pandas = df_app.toPandas()
    emptyDF = pd.concat([emptyDF, pandas])

print(df_app)
    

DataFrame[title: string, author: string, project: string, date_published: string, lead_image_url: string, content: string, next_page_url: string, url: string, domain: string, excerpt: string, word_count: int, direction: string, total_pages: int, rendered_pages: int]


In [13]:
emptyDF.head()

Unnamed: 0,title,author,project,date_published,lead_image_url,content,next_page_url,url,domain,excerpt,word_count,direction,total_pages,rendered_pages
0,Application Performance Monitoring AWS Lambda ...,Matt Makai,Masters,2019-05-21T16:21:43.081Z,https://www.fullstackpython.com/img/headers/py...,\nAmazon Web Services (AWS) Lambda is a usage-...,,https://www.fullstackpython.com/blog/applicati...,fullstackpython,Learn how to use Sentry Application Performanc...,1113,ltr,1,1
0,Asynchronous Web Scraping With Python AIOHTTP,Ronnie Atuhaire,Masters,2019-05-21T16:21:43.081Z,https://hashnode.com/utility/r?url=https%3A%2F...,"Hey there 👋, welcome here! Having looked at As...",,https://blog.octachart.com/asynchronous-web-sc...,blog.octachart.com,"Hey there 👋, welcome here! Having looked at As...",671,ltr,1,1
0,Automating Excel with Python Video Overview - ...,Not found,Masters,2019-05-21T16:21:43.081Z,https://miro.medium.com/max/640/0*n4Chf511o44y...,"In this tutorial, I will show you an overview ...",,https://www.blog.pythonlibrary.org/2022/03/29/...,www.blog.pythonlibrary.org,"In this tutorial, I will show you an overview ...",67,ltr,1,1
