In [None]:
from nyt_article_search import JSONParse
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as f
from pyspark.sql.types import *
from itertools import chain
import pandas as pd
import json
import os

### Load JSONs and transform in to lists of tuples

In [None]:
# Function to extend each list as we parse the jsons
# The get_article_authors and  search_article_keywords methods
# return a list of tuples so this will create one master list of all tuples
def extend_list(list_name, function_result):
    if function_result != None:
        list_name.extend(function_result)
    else:
        pass

In [None]:
# Lists that the data from each json response will be added to
article_ids = []
fact_data = []
author_data = []
subject_data = []
people_data = []
org_data = []
loc_data = []
# Big text will be a dictionary where we'll add headline, lead paragraph,
# abstract and web url
big_text = {}

In [None]:
data_folder = 'DATA'
state = 'TEXAS'
# states = [f for f in os.listdir(data_folder) if f != '_Archive']
state_folder = os.path.join(data_folder, state)
json_files = os.listdir(state_folder)
for file in json_files:
    filepath = os.path.join(state_folder, file)
    json_file = open(filepath, 'r')
    data = json.load(json_file)
    responses = data['response']['docs']
    # Parse each json response in the json file
    for respsonse in responses:
        j = JSONParse(respsonse)
        article_ids.append(j.article_id)
        fact_data.append(j.get_article_facts())
        extend_list(author_data, j.get_article_authors())
        extend_list(subject_data, j.search_article_keywords('subject'))
        extend_list(people_data, j.search_article_keywords('persons'))
        extend_list(org_data, j.search_article_keywords('organizations'))
        extend_list(loc_data, j.search_article_keywords('glocations'))
        id_text_dict = {
            j.article_id:{
                            'headline':j.get_text('headline')[1],
                            'abstract':j.get_text('abstract')[1],
                            'lead_paragraph':j.get_text('lead_paragraph')[1],
                            'web_url':j.get_text('web_url')[1]
                        }
            }
        big_text.update(id_text_dict)

In [None]:
spark = SparkSession.\
        builder.\
        appName('NYT_JSON_ETL').\
        master('local[1]').\
        getOrCreate()

In [None]:
#For each result get the column headers for the resulting
#dataframe
def get_table_headers(table):

    table_col_dict = {
        'facts':['article_id', 'publication_date',
                            'word_count', 'total_keywords',
                            'total_authors', 'words_in_headline',
                            'in_print', 'print_page', 'print_section',
                            'news_desk', 'section_name', 'article_type'],
        'authors':['article_id', 'rank', 'role',
                            'firstname', 'middlename', 'lastname', 'qualifier'],
        'subjects':['article_id', 'rank', 'name', 'value', 'major'],
        'text':['article_id', 'text']
    }

    headers = table_col_dict.get(table)
    return(headers)

In [None]:
#Create spark dfs
facts = spark.createDataFrame(fact_data, schema=get_table_headers('facts'))
authors = spark.createDataFrame(author_data, schema=get_table_headers('authors'))
subjects = spark.createDataFrame(subject_data, schema=get_table_headers('subjects'))
people = spark.createDataFrame(people_data, schema=get_table_headers('subjects'))
organizations = spark.createDataFrame(org_data, schema=get_table_headers('subjects'))
locations = spark.createDataFrame(loc_data, schema=get_table_headers('subjects'))

### Step 1: Create Primary Keys in tables

In [None]:
# First step is to create an interger primary key for the article_ids
ids = [(e + 1000, i) for e, i in enumerate(article_ids)]
# Create spark df out of id list
# Integer Primary Key is fact_id
id_schema = StructType([
    StructField('fact_id', IntegerType(), False),
    StructField('article_id', StringType(), False)
    ])

### THIS SHOULD NOT CHANGE ANY FURTHER
id_df = spark.createDataFrame(ids, schema=id_schema)



#Merge the id_df dataframe into the existing frames
# To put fact_id in all the other tables
facts = id_df.join(facts, ['article_id'], how = 'inner').drop('article_id')
authors = id_df.join(authors, ['article_id'], how = 'inner').drop('article_id')
subjects = id_df.join(subjects, ['article_id'], how = 'inner').drop('article_id')
people = id_df.join(people, ['article_id'], how = 'inner').drop('article_id')
organizations = id_df.join(organizations, ['article_id'], how = 'inner').drop('article_id')
locations = id_df.join(locations, ['article_id'], how = 'inner').drop('article_id')

In [None]:
# Union the subjects, organizations, and locations dfs togehter
places_and_things = subjects.union(organizations).union(locations).orderBy(['fact_id', 'rank'])

#

In [None]:
#Create window function to partition by id and order by rank
window = Window().partitionBy('fact_id').orderBy('rank')
# Create primary key for places_and_things, authors, and people dataframes
def create_primary_key(df, key_name, article_id, w):
    #Create window function to partition by id and order by rank
    window = w
    #Row number
    df = df.withColumn(key_name, f.row_number().over(window))
    #Divide each row number by 1000 to get a decimal representation
    df = df.withColumn(key_name, f.col(key_name) / 1000)
    #Add the decimal to the id column (primary key for fact dataframe) to create a logical
    #Key representation
    df = df.withColumn(key_name, f.col(article_id) + f.col(key_name))
    return(df)

places_and_things = create_primary_key(places_and_things, 'table_id', 'fact_id', window)
authors = create_primary_key(authors, 'table_id', 'fact_id', window)
people = create_primary_key(people, 'table_id', 'fact_id', window)
places_and_things.show(2)
authors.show(2)
people.show(2)


### Step 2: Text standardization

In [None]:
dim_folder = os.path.join('DATA', 'DIM_TABLES')
dim_news_desk = os.path.join(dim_folder, 'news_desks.csv')
dim_types = os.path.join(dim_folder, 'article_types.csv')
dim_sections = os.path.join(dim_folder, 'section_names.csv')

news_desk_df = spark.read.option('header', True).csv(dim_news_desk)
material_df = spark.read.option('header', True).csv(dim_types)
section_df = spark.read.option('header', True).csv(dim_sections)

In [None]:
def clean_text(clean_df, clean_col, dim_df):
    #clean_df = the dataframe to be cleaned
    #clean_col = the column to be cleaned
    #dim_df = the dimensional table to clean with

    #replace any 'None' strings with None type
    clean_df = clean_df.withColumn(clean_col, f.when(clean_df[clean_col] == 'None', None).otherwise(clean_df[clean_col]))
    #Get distinct values where not null
    distincts = clean_df.where(clean_df[clean_col].isNotNull()).select(clean_df[clean_col]).distinct()
    #Get values that aren't in the dimensional tables
    distincts = distincts.join(dim_df, [clean_col], how = 'left')
    #get the id column from the dimensional table
    id_col = f'{clean_col}_id'
    dirty_text = distincts.where(distincts[id_col].isNull()).select(distincts[clean_col].alias('dirty_text'))
    #Cross join and calculate levenshtein distance
    dirty_text = dirty_text.crossJoin(dim_df).select(['dirty_text', clean_col])
    dirty_text = dirty_text.withColumn('levenshtein', f.levenshtein('dirty_text', clean_col))

    #Get min levenshtein distance for each dirty text
    #Min = best match
    min_lev = dirty_text.groupBy('dirty_text').agg(f.min('levenshtein').alias('levenshtein'))
    dirty_text = dirty_text.join(min_lev, ['dirty_text' , 'levenshtein'])

    #In case there are no good matches use row number and grab first one
    window = Window().partitionBy('dirty_text').orderBy('dirty_text')
    dirty_text = dirty_text.withColumn('row_num', f.row_number().over(window))
    dirty_text = dirty_text.where(dirty_text['row_num']== 1).select(['dirty_text', clean_col])
    text_mapping = {row['dirty_text']:row[clean_col] for row in dirty_text.rdd.collect()}
    clean_df = clean_df.replace(text_mapping, subset = [clean_col])
    return(clean_df)





In [None]:
facts = clean_text(facts, 'article_type', material_df)
facts = clean_text(facts, 'news_desk', news_desk_df)
facts = clean_text(facts, 'section_name', section_df)
facts.show()


In [None]:
page_rdd = facts.select(['print_page']).rdd.flatMap(lambda x: x).collect()
#unique pages in df
pages = list(set(page_rdd))



In [None]:
pages_cleaned = {}
for page in pages:
    if page != None:
        try:
            p = float(page)
            p_dict = {page:p}
            pages_cleaned.update(p_dict)
        except:
            p = page[:-1]
            p = float(p)
            p_dict = {page:p}
            pages_cleaned.update(p_dict)
    else:
        # p = None
        # p_dict = {page:p}
        # pages_cleaned.update(p_dict)
        pass

mapping = f.create_map([f.lit(x) for x in chain(*pages_cleaned.items())])

# facts.select(mapping[facts['print_page']].alias('page_clean')).show()
facts = facts.withColumn('print_page', mapping[facts['print_page']])