# Setup Environment

In [1]:

# Importing PySpark and Initializing Spark
import findspark
findspark.init()  # This must be executed before importing PySpark
from pyspark.sql import SparkSession
import pyspark
# Lets read the Multiline JSON
from pyspark.sql.functions import explode

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
spark


# Testing imports for 2015 and older data

This is because the json files are saved differently and I wanted to see what could provide me the  correct import. 

In [2]:
data = spark.read.json(r"D:\Data\Kickstarter Data\2015_and_older\Kickstarter_2014-04-22.json", multiLine = True)

data1 = spark.read.json(r"D:\Data\Kickstarter Data\2015_and_older\Kickstarter_2014-08-13.json", multiLine = True, schema = data.schema)

data2 = data.unionByName(data1, allowMissingColumns=True)

print('data count:', data.count())
print('data1 count:', data1.count())
print('data2 count:', data2.select('projects').count())

data2.select('projects.name').show(5, False)

data2 = data2.withColumn('projects', explode('projects'))

data2.select('projects.name').show(5)



data count: 7097
data1 count: 7437
data2 count: 14534
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|name                                                                                                                             

Importing recursively once I figured what was the correct way in which I could imported project level data. Also notice that I allow for missing columns because after 2014/12, they provide mroe columns which leads to more complexity when combining all prior years data with new JSON format.

In [3]:
import os 

# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2015_and_older"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2015 = spark.read.json(os.path.join(dirname, filename), multiLine = True)
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True, schema = data_2015.schema)
            # Union the dataframes
            data_2015 = data_2015.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')

data_2015 = data_2015.withColumn('projects', explode('projects'))


First file: Kickstarter_2014-04-22.json processed
Union Succesful for Kickstarter_2014-08-13.json
Union Succesful for Kickstarter_2014-10-17.json
Union Succesful for Kickstarter_2014-12-02.json
Union Succesful for Kickstarter_2015-04-02.json


In [5]:
# Checking the new schema
data_2015.printSchema()

root
 |-- projects: struct (nullable = true)
 |    |-- backers_count: long (nullable = true)
 |    |-- blurb: string (nullable = true)
 |    |-- category: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- parent_id: long (nullable = true)
 |    |    |-- position: long (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- urls: struct (nullable = true)
 |    |    |    |-- web: struct (nullable = true)
 |    |    |    |    |-- discover: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- created_at: long (nullable = true)
 |    |-- creator: struct (nullable = true)
 |    |    |-- avatar: struct (nullable = true)
 |    |    |    |-- medium: string (nullable = true)
 |    |    |    |-- small: string (nullable = true)
 |    |    |    |-- thumb: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |

## Importing rest of 2015 data

In [4]:
# Rest of 2015
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2015_new"):
    for filename in filenames:
            # Read the first file
            df = spark.read.json(os.path.join(dirname, filename)).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            data_2015 = data_2015.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')


Union Succesful for Kickstarter_2015-11-01T14_09_04_557Z.json
Union Succesful for Kickstarter_2015-12-17T12_09_06_107Z.json


In [5]:
# Sanity Check
data_2015.select('projects.creator.name').show(5)

+--------------------+
|                name|
+--------------------+
|      Maridee Slater|
|April Yvette Thom...|
|        Lucile Scott|
|  Three Day Hangover|
|      Throes Theater|
+--------------------+
only showing top 5 rows



## 2016 old format data

In [6]:
first_file_processed = False

for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2016"):
        for filename in filenames:
            if not first_file_processed:
                # Read the first file
                data_2016 = spark.read.json(os.path.join(dirname, filename), dropFieldIfAllNull=True).\
                        withColumnRenamed('data', 'projects').\
                        drop('created_at',
                            'id',
                            'robot_id',
                            'run_id',
                            'table_id')
                # Set the flag to True
                first_file_processed = True
                print(f'First file: {filenames[0]} processed')
            else:
                df = spark.read.json(os.path.join(dirname, filename), schema = data_2016.schema, dropFieldIfAllNull=True).\
                        withColumnRenamed('data', 'projects').\
                        drop('created_at',
                            'id',
                            'robot_id',
                            'run_id',
                            'table_id')
                # Union the dataframes
                data_2016 = data_2016.unionByName(df)
                print(f'Union Succesful for {filename}')
from pyspark.sql.functions import col

data_2016 = data_2016.where(col("projects.creator.name").isNotNull())

data_2016.createOrReplaceTempView('data_2016')

First file: Kickstarter_2016-01-28T09_15_08_781Z.json.gz processed
Union Succesful for Kickstarter_2016-03-22T07_41_08_591Z.json.gz
Union Succesful for Kickstarter_2016-04-15T02_09_04_328Z.json.gz
Union Succesful for Kickstarter_2016-05-15T02_04_46_813Z.json.gz
Union Succesful for Kickstarter_2016-06-15T02_04_49_697Z.json.gz
Union Succesful for Kickstarter_2016-07-15T02_04_40_862Z.json.gz
Union Succesful for Kickstarter_2016-08-15T02_04_03_829Z.json.gz
Union Succesful for Kickstarter_2016-09-15T02_04_03_474Z.json.gz
Union Succesful for Kickstarter_2016-12-15T22_20_52_411Z.json.gz


Checking whether data is imported correctly with a count function. Doing that with spark is a good way of checking for incorrect imports. Usually this function breaks if there are null rows in the dataset. 

In [7]:
data_2016.count()

147429

# Testing 2016 data for simple repetition of creators

In [8]:
result = spark.sql('''
SELECT 
    projects.creator.name,
    COUNT(*)
FROM
    data_2016
GROUP BY
    projects.creator.name
ORDER BY 
    COUNT(*) DESC
''')
result.show()



+--------------------+--------+
|                name|count(1)|
+--------------------+--------+
|         GBS Detroit|      94|
|             Michael|      61|
|         Game Salute|      58|
|               James|      51|
|              Daniel|      48|
|               Chris|      48|
|               David|      43|
|                John|      41|
|                Ryan|      40|
|              Andrew|      36|
|               Jason|      34|
|Collectable Playi...|      34|
|                Adam|      34|
|         Queen Games|      33|
|Gryphon and Eagle...|      32|
|                Alex|      32|
|                Matt|      29|
|         Christopher|      28|
|               Sarah|      28|
|                Mike|      27|
+--------------------+--------+
only showing top 20 rows



In [9]:
data_2016.printSchema()

root
 |-- projects: struct (nullable = true)
 |    |-- backers_count: long (nullable = true)
 |    |-- blurb: string (nullable = true)
 |    |-- category: struct (nullable = true)
 |    |    |-- color: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- parent_id: long (nullable = true)
 |    |    |-- position: long (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- urls: struct (nullable = true)
 |    |    |    |-- web: struct (nullable = true)
 |    |    |    |    |-- discover: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- created_at: long (nullable = true)
 |    |-- creator: struct (nullable = true)
 |    |    |-- avatar: struct (nullable = true)
 |    |    |    |-- medium: string (nullable = true)
 |    |    |    |-- small: string (nullable = true)
 |    |    |    |-- thumb: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    | 

## 2017 data

In [10]:

# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2017"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2017 = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True, schema = data_2017.schema).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2017 = data_2017.union(df)
            print(f'Union Succesful for {filename}')



First file: Kickstarter_2017-01-15T22_21_04_985Z.json.gz processed
Union Succesful for Kickstarter_2017-02-15T22_22_48_377Z.json.gz
Union Succesful for Kickstarter_2017-03-15T22_20_55_874Z.json.gz
Union Succesful for Kickstarter_2017-04-15T22_21_18_122Z.json.gz
Union Succesful for Kickstarter_2017-05-15T22_21_11_300Z.json.gz
Union Succesful for Kickstarter_2017-06-15T22_20_03_059Z.json.gz
Union Succesful for Kickstarter_2017-07-15T22_20_48_951Z.json.gz
Union Succesful for Kickstarter_2017-08-15T22_20_51_958Z.json.gz
Union Succesful for Kickstarter_2017-09-15T22_20_48_432Z.json.gz
Union Succesful for Kickstarter_2017-10-15T10_20_38_271Z.json.gz
Union Succesful for Kickstarter_2017-11-15T10_21_04_919Z.json.gz
Union Succesful for Kickstarter_2017-12-15T10_20_51_610Z.json.gz


## 2018 data

This dataset is slightly weird again. It has a whole column which is null. Therefore, a dropFieldIfAllNull is given here. This leads to increased import time.

In [24]:

# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2018"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2018 = spark.read.json(os.path.join(dirname, filename), dropFieldIfAllNull=True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), dropFieldIfAllNull=True, schema = data_2018.schema).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2018 = data_2018.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')



First file: Kickstarter_2018-01-12T10_20_09_196Z.json.gz processed
Union Succesful for Kickstarter_2018-02-15T03_20_44_743Z.json.gz
Union Succesful for Kickstarter_2018-03-15T03_20_39_033Z.json.gz
Union Succesful for Kickstarter_2018-04-12T03_20_13_192Z.json.gz
Union Succesful for Kickstarter_2018-05-17T03_20_08_333Z.json.gz
Union Succesful for Kickstarter_2018-06-14T03_20_15_782Z.json.gz
Union Succesful for Kickstarter_2018-07-12T03_20_16_435Z.json.gz
Union Succesful for Kickstarter_2018-08-16T03_20_13_856Z.json.gz
Union Succesful for Kickstarter_2018-09-13T03_20_17_777Z.json.gz
Union Succesful for Kickstarter_2018-10-18T03_20_48_880Z.json.gz
Union Succesful for Kickstarter_2018-11-15T03_20_50_568Z.json.gz
Union Succesful for Kickstarter_2018-12-13T03_20_05_701Z.json.gz


## 2019, 2020, 2021, 2022 data

In [26]:
import os
from pyspark.sql import SparkSession

def process_files_for_year(year):
    # Initialize Spark session
    spark = SparkSession.builder.appName(f'Kickstarter_{year}').getOrCreate()
    
    data = None
    first_file_processed = False

    directory = os.path.join(r"D:\Data\Kickstarter Data", str(year))
    for dirname, _, filenames in os.walk(directory):
        for filename in filenames:
            file_path = os.path.join(dirname, filename)
            df = spark.read.json(file_path, multiLine=True).\
                withColumnRenamed('data', 'projects').\
                drop('created_at', 'id', 'robot_id', 'run_id', 'table_id')

            if not first_file_processed:
                data = df
                first_file_processed = True
                print(f'First file: {filename} processed')
            else:
                data = data.unionByName(df, allowMissingColumns=True)
                print(f'Union Successful for {filename}')
    
    return data

# Process files for each year
data_2019 = process_files_for_year(2019)
data_2020 = process_files_for_year(2020)
data_2021 = process_files_for_year(2021)
data_2022 = process_files_for_year(2022)


First file: Kickstarter_2019-01-17T03_20_02_630Z.json.gz processed
Union Successful for Kickstarter_2019-02-14T03_20_04_734Z.json.gz
Union Successful for Kickstarter_2019-03-14T03_20_12_200Z.json.gz
Union Successful for Kickstarter_2019-04-18T03_20_02_220Z.json.gz
Union Successful for Kickstarter_2019-05-16T03_20_20_822Z.json.gz
Union Successful for Kickstarter_2019-06-13T03_20_35_801Z.json.gz
Union Successful for Kickstarter_2019-07-18T03_20_05_009Z.json.gz
Union Successful for Kickstarter_2019-08-15T03_20_03_022Z.json.gz
Union Successful for Kickstarter_2019-09-12T03_20_06_215Z.json.gz
Union Successful for Kickstarter_2019-10-17T03_20_19_421Z.json.gz
Union Successful for Kickstarter_2019-11-14T03_20_27_004Z.json.gz
Union Successful for Kickstarter_2019-12-12T03_20_05_306Z.json.gz
First file: Kickstarter_2020-01-16T03_20_15_556Z.json.gz processed
Union Successful for Kickstarter_2020-02-13T03_20_04_893Z.json.gz
Union Successful for Kickstarter_2020-03-12T03_20_06_551Z.json.gz
Union Su

# Checking data integrity


In [27]:
data = [data_2015, data_2016, data_2017, data_2018, data_2019, data_2020, data_2021, data_2022]

for i in data:
    i.select('projects.creator.name').show(1)

+--------------+
|          name|
+--------------+
|Maridee Slater|
+--------------+
only showing top 1 row

+---------------+
|           name|
+---------------+
|Adelfino Corino|
+---------------+
only showing top 1 row

+--------------+
|          name|
+--------------+
|Charlotte Cole|
+--------------+
only showing top 1 row

+------+
|  name|
+------+
|arthur|
+------+
only showing top 1 row

+-----------+
|       name|
+-----------+
|Ryan Turner|
+-----------+
only showing top 1 row

+---------------+
|           name|
+---------------+
|Anh Vongbandith|
+---------------+
only showing top 1 row

+------------+
|        name|
+------------+
|The Backyard|
+------------+
only showing top 1 row

+-----------+
|       name|
+-----------+
|Jorge Muniz|
+-----------+
only showing top 1 row



# Chat-GPT prompt to predict gender

In [None]:
#write a python code to predict gender using an Open AI wrapper

import openai

client = openai.OpenAI(api_key="API KEY", timeout=60)


In [None]:

'''responses = []

for i in range(len(list_of_names)):
    response = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=[{"role": "system", "content": "You are a gender/company predictor bot. The following are list of names that you need to classify on whether they are male/female/company. Also provide the probability of the prediction with 4 decimal places. Format in guess/probability"},
                {"role": "user", "content": list_of_names[i]}]
    )
    responses.append(response.choices[0].message.content)

    
'''

In [None]:

#2019
# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2019"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2019 = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2019 = data_2019.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')


#2020

# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2020"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2020 = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2020 = data_2020.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')



#2021
# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2021"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2021 = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2021 = data_2021.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')


# 2022
# Read the JSON files and union them
first_file_processed = False
# Loop through the files in the directory
for dirname, _, filenames in os.walk(r"D:\Data\Kickstarter Data\2022"):
    for filename in filenames:
        if not first_file_processed:
            # Read the first file
            data_2022 = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Set the flag to True
            first_file_processed = True
            print(f'First file: {filename} processed')
        else:
            # Read the next file and union it with the first file
            df = spark.read.json(os.path.join(dirname, filename), multiLine = True).\
                    withColumnRenamed('data', 'projects').\
                    drop('created_at',
                        'id',
                        'robot_id',
                        'run_id',
                        'table_id')
            # Union the dataframes
            data_2022 = data_2022.unionByName(df, allowMissingColumns=True)
            print(f'Union Succesful for {filename}')

