Note: This notebook contains the operations performed before the data exploration. Further transformation such as Scaling and feature selection will be performed before the Predictive Analysis.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofweek, datediff
from pyspark.sql.functions import length, col, expr

#initialize a spark session configured to be executed in local mode
spark = SparkSession.builder \
    .master("local") \
    .appName("AppName") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 17:49:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# open the original dataset, available at https://www.kaggle.com/datasets/kemical/kickstarter-projects
# escape character is '"', useful if the separator appears inside data
# we know the file has an header
# we want to automatically infer data type of each column
# data rows which do not respect the format are dropped
df = spark.read.option('escape','"').csv('ks-projects-201801.csv', header=True, inferSchema=True, mode="DROPMALFORMED")

                                                                                

### Select only successful and failed projects

In [6]:
# we exclude other states while we want to focus on the final state reached by a project 

df = df.filter((col("state") == "successful") | (col("state") == "failed"))

### Removing useless features & Rename

In [7]:
df = df.drop("ID", "goal", "pledged", "usd pledged")

In [9]:
# Rename columns: we keep the conversions made with fixer.io API
df = df.withColumnRenamed("usd_pledged_real", "pledged")
df = df.withColumnRenamed("usd_goal_real", "goal")

### Extract year, month, day of the week and time interval

In [10]:
#extract year, month and day of the week from "launched"
df = df.withColumn("year", year("launched"))
df = df.withColumn("month", month("launched"))
df = df.withColumn("day_of_week", dayofweek("launched"))

# Calculate the time interval in days
df = df.withColumn("time_interval", datediff("deadline", "launched"))

In [11]:
df = df.drop("deadline", "launched")

### Extract information from the title of the projects

In [13]:
df = df.withColumn("length_of_title", length(col("name")))

# does the title contains '?' or '!' chars? (boolean value)
df = df.withColumn("use_of_?!", expr("CASE WHEN INSTR(name, '?') > 0 OR INSTR(name, '!') > 0 THEN 1 ELSE 0 END"))

### Group country by continent

In [14]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType

# Map each country into its continent, to have a more coarse-grained variable 

@udf(StringType())
def assign_continent(country):
    continent_by_country = {
        "America": ["MX", "CA", "US"],
        "Europe": ["NL", "AT", "GB", "DE", "ES", "FR", "CH", "IT", "SE", "IE", "BE", "NO", "LU", "DK"],
        "Asia": ["HK", "SG", "JP"],
        "Oceania": ["NZ", "AU"],
        "": ["N,0"]
    }
    
    for continent, countries in continent_by_country.items():
        if country in countries:
            return continent
    
    return ""  # Return None if the country does not belong to any continent

In [15]:
df = df.withColumn("continent", assign_continent(df.country))

In [16]:
df = df.drop("country")

### Save resulting dataframe

In [17]:
df.write.mode("overwrite").option('header', 'true').option('escape','"').csv('kickstarter_cleaned.csv')

                                                                                