In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('logistic_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.classification import LogisticRegression

# Import data and print schema - columns is another way to view the data's features.
df = spark.read.csv('Datasets/IVS_Country.csv', header=True, inferSchema=True)
df.printSchema()
print(df.columns)

root
 |-- Pkey: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Year ending: string (nullable = true)
 |-- Airport of departure: string (nullable = true)
 |-- Purpose of visit: string (nullable = true)
 |-- Country of permanent residence: string (nullable = true)
 |-- Total visitor spend: double (nullable = true)
 |-- Total visitors: integer (nullable = true)

['Pkey', 'Year', 'Quarter', 'Year ending', 'Airport of departure', 'Purpose of visit', 'Country of permanent residence', 'Total visitor spend', 'Total visitors']


In [2]:
# Import pandas.
import pandas as pd

# Take the first five rows of data, and visualise.
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,Pkey,Year,Quarter,Year ending,Airport of departure,Purpose of visit,Country of permanent residence,Total visitor spend,Total visitors
0,1,1997,4,YEDec 1997,Auckland,Business,Africa and Middle East,4266524.0,1684
1,2,1997,4,YEDec 1997,Auckland,Business,Australia,132588500.0,67277
2,3,1997,4,YEDec 1997,Auckland,Business,Canada,15589690.0,3596
3,4,1997,4,YEDec 1997,Auckland,Business,China,9490118.0,4510
4,5,1997,4,YEDec 1997,Auckland,Business,Germany,5481106.0,1889


In [3]:
# To visualise the first five columns, simply add transpose. 
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Pkey,1,2,3,4,5
Year,1997,1997,1997,1997,1997
Quarter,4,4,4,4,4
Year ending,YEDec 1997,YEDec 1997,YEDec 1997,YEDec 1997,YEDec 1997
Airport of departure,Auckland,Auckland,Auckland,Auckland,Auckland
Purpose of visit,Business,Business,Business,Business,Business
Country of permanent residence,Africa and Middle East,Australia,Canada,China,Germany
Total visitor spend,4.26652e+06,1.32589e+08,1.55897e+07,9.49012e+06,5.48111e+06
Total visitors,1684,67277,3596,4510,1889


In [4]:
# We can use group by and count to find out how many data points we have for each class in our predictor. 
df.groupby('year').count().toPandas()

Unnamed: 0,year,count
0,2003,832
1,2007,832
2,2018,208
3,2015,832
4,2006,832
5,2013,832
6,1997,208
7,2014,832
8,2004,832
9,1998,832


In [5]:
# Using a for loop to find all columns that belong to the integer data type. 
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']

# Selecting the numeric features, generating summary statistics, and converting to a Pandas DataFrame.
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Pkey,17056,8528.5,4923.787431100846,1,17056
Year,17056,2007.5,5.9270657557172814,1997,2018
Quarter,17056,2.5,1.1289220372929016,1,4
Total visitors,11292,15576.741232731138,33229.303734004156,1,314584


In [17]:
# Using a for loop to find all columns that belong to the integer data type. 
numeric_features = [t[0] for t in df.dtypes if t[1] == 'double']

# Selecting the numeric features, generating summary statistics, and converting to a Pandas DataFrame.
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Total visitor spend,11292,5.09246119696264E7,1.0193863986366412E8,2.4,1.235333092E9


In [18]:
# Import the relevant packages.
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
country_indexer = StringIndexer(inputCol='Country of permanent residence',outputCol='countryIndex')

In [19]:
# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
country_encoder = OneHotEncoder(inputCol='countryIndex',outputCol='countryVec')

In [20]:
# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['countryVec'], outputCol="features")

In [21]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[country_indexer, country_encoder, assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(df)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(df)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

AnalysisException: "cannot resolve '`label`' given input columns: [Pkey, Year ending, Country of permanent residence, Quarter, Purpose of visit, Total visitors, features, Year, Total visitor spend, countryVec, Airport of departure, countryIndex];;\n'Project ['label, features#416]\n+- Project [Pkey#0, Year#1, Quarter#2, Year ending#3, Airport of departure#4, Purpose of visit#5, Country of permanent residence#6, Total visitor spend#7, Total visitors#8, countryIndex#391, countryVec#403, UDF(named_struct(countryVec, countryVec#403)) AS features#416]\n   +- Project [Pkey#0, Year#1, Quarter#2, Year ending#3, Airport of departure#4, Purpose of visit#5, Country of permanent residence#6, Total visitor spend#7, Total visitors#8, countryIndex#391, if (isnull(cast(countryIndex#391 as double))) null else UDF(cast(countryIndex#391 as double)) AS countryVec#403]\n      +- Project [Pkey#0, Year#1, Quarter#2, Year ending#3, Airport of departure#4, Purpose of visit#5, Country of permanent residence#6, Total visitor spend#7, Total visitors#8, UDF(cast(Country of permanent residence#6 as string)) AS countryIndex#391]\n         +- Relation[Pkey#0,Year#1,Quarter#2,Year ending#3,Airport of departure#4,Purpose of visit#5,Country of permanent residence#6,Total visitor spend#7,Total visitors#8] csv\n"