We work with Gapminder data.  The same dataset is behind our Bokeh app.

In [1]:
type(sc)

pyspark.context.SparkContext

In [2]:
type(spark)

pyspark.sql.session.SparkSession

In [3]:
df = spark.read.csv('gapminder.csv', header = True)

In [4]:
df.show()

+-----------+----+-----------------+------------------+----------+---------------+------+----------+
|    Country|Year|        fertility|              life|population|child_mortality|   gdp|    region|
+-----------+----+-----------------+------------------+----------+---------------+------+----------+
|Afghanistan|1964|            7.671|            33.639|10474903.0|          339.7|1182.0|South Asia|
|Afghanistan|1965|            7.671|            34.152|10697983.0|          334.1|1182.0|South Asia|
|Afghanistan|1966|            7.671|            34.662|10927724.0|          328.7|1168.0|South Asia|
|Afghanistan|1967|            7.671|             35.17|11163656.0|          323.3|1173.0|South Asia|
|Afghanistan|1968|            7.671|            35.674|11411022.0|          318.1|1187.0|South Asia|
|Afghanistan|1969|            7.671|            36.172|11676990.0|          313.0|1178.0|South Asia|
|Afghanistan|1970|            7.671|36.663000000000004|11964906.0|          307.8|1174.0|So

In [5]:
df.describe().show()

+-------+-----------+------------------+-----------------+------------------+-------------------+-----------------+------------------+------------------+
|summary|    Country|              Year|        fertility|              life|         population|  child_mortality|               gdp|            region|
+-------+-----------+------------------+-----------------+------------------+-------------------+-----------------+------------------+------------------+
|  count|      10111|             10111|            10100|             10111|              10108|             9210|              9000|             10111|
|   mean|       null|1988.5145880723965|4.028718761955669| 64.07859968299091|2.560448031559161E7|80.83450488599334|12746.916666666666|              null|
| stddev|       null|14.430849463812343|2.013967511764119|11.122778564950746|1.032383024935289E8| 79.2209420722028| 17797.80995341417|              null|
|    min|Afghanistan|              1964|            0.836|              16.1

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [128]:
df.createOrReplaceTempView('gapminder') # .registerTempTable

In [7]:
spark.catalog.listTables()

[]

In [8]:
# *****Converting spark dataframe to pandas dataframe*****
import pandas as pd
gap_df = df.toPandas()

In [9]:
gap_df.info() # would need to change data types !

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10111 entries, 0 to 10110
Data columns (total 8 columns):
Country            10111 non-null object
Year               10111 non-null object
fertility          10100 non-null object
life               10111 non-null object
population         10108 non-null object
child_mortality    9210 non-null object
gdp                9000 non-null object
region             10111 non-null object
dtypes: object(8)
memory usage: 632.0+ KB


In [10]:
gap_df[['Year','fertility','life','population','child_mortality','gdp']] = gap_df[['Year','fertility','life','population','child_mortality','gdp']].apply(pd.to_numeric)
gap_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10111 entries, 0 to 10110
Data columns (total 8 columns):
Country            10111 non-null object
Year               10111 non-null int64
fertility          10100 non-null float64
life               10111 non-null float64
population         10108 non-null float64
child_mortality    9210 non-null float64
gdp                9000 non-null float64
region             10111 non-null object
dtypes: float64(5), int64(1), object(2)
memory usage: 632.0+ KB


# Queries !

In [101]:
# ****Querying a table*****
query = "SELECT Country FROM gapminder WHERE Country like 'Par%' LIMIT 10 "

gapminder10 = spark.sql(query)
gapminder10.show()

print(type(gapminder10))

+--------+
| Country|
+--------+
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
|Paraguay|
+--------+

<class 'pyspark.sql.dataframe.DataFrame'>


In [24]:
# filtering a dataframe (different from querying a table)
# can chain filters
df.filter("Country == 'Paraguay'").show()

+--------+----+------------------+-----------------+----------+---------------+------+-------+
| Country|Year|         fertility|             life|population|child_mortality|   gdp| region|
+--------+----+------------------+-----------------+----------+---------------+------+-------+
|Paraguay|1964|              6.41|64.64399999999999| 2118896.0|           82.0|3163.0|America|
|Paraguay|1965|             6.356|           64.779| 2176195.0|           81.3|3256.0|America|
|Paraguay|1966|             6.276|           64.904| 2235296.0|           80.6|3203.0|America|
|Paraguay|1967|              6.17|           65.031| 2296118.0|           80.1|3315.0|America|
|Paraguay|1968|             6.041|            65.17| 2358072.0|           79.4|3339.0|America|
|Paraguay|1969|             5.894|65.32300000000001| 2420365.0|           78.7|3375.0|America|
|Paraguay|1970|             5.739|           65.487| 2482508.0|           78.0|3487.0|America|
|Paraguay|1971|5.5889999999999995|           65.65

In [25]:
df.select("Country","Year","fertility").show()

+-----------+----+-----------------+
|    Country|Year|        fertility|
+-----------+----+-----------------+
|Afghanistan|1964|            7.671|
|Afghanistan|1965|            7.671|
|Afghanistan|1966|            7.671|
|Afghanistan|1967|            7.671|
|Afghanistan|1968|            7.671|
|Afghanistan|1969|            7.671|
|Afghanistan|1970|            7.671|
|Afghanistan|1971|            7.671|
|Afghanistan|1972|            7.671|
|Afghanistan|1973|            7.671|
|Afghanistan|1974|            7.671|
|Afghanistan|1975|            7.671|
|Afghanistan|1976|             7.67|
|Afghanistan|1977|             7.67|
|Afghanistan|1978|             7.67|
|Afghanistan|1979|            7.669|
|Afghanistan|1980|            7.669|
|Afghanistan|1981|             7.67|
|Afghanistan|1982|            7.671|
|Afghanistan|1983|7.672999999999999|
+-----------+----+-----------------+
only showing top 20 rows



In [29]:
# Define total_gdp
total_gdp = (df.population * df.gdp).alias("total_gdp")

# Select columns
gdp1 = df.select(df.Country, df.Year, total_gdp).show()

# Create the same table using SQL "as" instead of .alias
gdp2 = df.selectExpr("Country", "Year", "(population * gdp) as total_gdp")

+-----------+----+---------------+
|    Country|Year|      total_gdp|
+-----------+----+---------------+
|Afghanistan|1964|1.2381335346E10|
|Afghanistan|1965|1.2645015906E10|
|Afghanistan|1966|1.2763581632E10|
|Afghanistan|1967|1.3094968488E10|
|Afghanistan|1968|1.3544883114E10|
|Afghanistan|1969| 1.375549422E10|
|Afghanistan|1970|1.4046799644E10|
|Afghanistan|1971|1.3402226292E10|
|Afghanistan|1972|1.3172997648E10|
|Afghanistan|1973|1.4684922363E10|
|Afghanistan|1974| 1.547199576E10|
|Afghanistan|1975|1.6220158344E10|
|Afghanistan|1976|1.6946920952E10|
|Afghanistan|1977|1.5669813552E10|
|Afghanistan|1978|1.6717814424E10|
|Afghanistan|1979|1.6458164415E10|
|Afghanistan|1980|1.6427074182E10|
|Afghanistan|1981|1.7955574128E10|
|Afghanistan|1982| 1.916936374E10|
|Afghanistan|1983|1.9338281424E10|
+-----------+----+---------------+
only showing top 20 rows



In [40]:
# "like" is sensitive to capitalization
gdp2.filter("Country like 'Par%'").show()

+--------+----+---------------+
| Country|Year|      total_gdp|
+--------+----+---------------+
|Paraguay|1964|  6.702068048E9|
|Paraguay|1965|   7.08569092E9|
|Paraguay|1966|  7.159653088E9|
|Paraguay|1967|   7.61163117E9|
|Paraguay|1968|  7.873602408E9|
|Paraguay|1969|  8.168731875E9|
|Paraguay|1970|  8.656505396E9|
|Paraguay|1971|  9.004497118E9|
|Paraguay|1972|  9.429799626E9|
|Paraguay|1973|1.0103243725E10|
|Paraguay|1974|1.0874100319E10|
|Paraguay|1975|1.1525173029E10|
|Paraguay|1976|1.2308180472E10|
|Paraguay|1977|1.3650800448E10|
|Paraguay|1978|1.5190831854E10|
|Paraguay|1979|1.6931454291E10|
|Paraguay|1980|1.9446552816E10|
|Paraguay|1981|2.1162776849E10|
|Paraguay|1982|2.0442409788E10|
|Paraguay|1983|1.9830641436E10|
+--------+----+---------------+
only showing top 20 rows



In [76]:
df2 = df.selectExpr("cast(Year as int)").groupBy().max("Year")
df2.show()

+---------+
|max(Year)|
+---------+
|     2013|
+---------+



In [88]:
# df.schema.names
types = [f.dataType for f in df2.schema.fields]
print(types)

[IntegerType]


In [108]:
from pyspark.sql.functions import desc
# Find the lowest gdp, grouped by region and sorted in descending order
df3 = df.selectExpr("region","cast(gdp as float)").groupBy("region").min("gdp").sort(desc("min(gdp)"))
df3.show()

+--------------------+--------+
|              region|min(gdp)|
+--------------------+--------+
|             America|  1518.0|
|Middle East & Nor...|  1202.0|
|Europe & Central ...|  1040.0|
|          South Asia|   725.0|
| East Asia & Pacific|   669.0|
|  Sub-Saharan Africa|   142.0|
+--------------------+--------+



In [115]:
# conventional sql query producing same result
query = "select region, min(cast(gdp as float)) as min_gdp from gapminder group by region order by min_gdp desc"

max_gdp = spark.sql(query)
max_gdp.show()

+--------------------+-------+
|              region|min_gdp|
+--------------------+-------+
|             America| 1518.0|
|Middle East & Nor...| 1202.0|
|Europe & Central ...| 1040.0|
|          South Asia|  725.0|
| East Asia & Pacific|  669.0|
|  Sub-Saharan Africa|  142.0|
+--------------------+-------+



In [116]:
import pyspark.sql.functions as F

by_country = df.groupBy("Country")

# Standard deviation
by_country.agg(F.stddev("child_mortality")).show()

+----------------+----------------------------+
|         Country|stddev_samp(child_mortality)|
+----------------+----------------------------+
|            Chad|          35.739075467982104|
|        Paraguay|           20.19840254342217|
|          Russia|           9.231132658740492|
|Congo, Dem. Rep.|          45.122435491603035|
|         Senegal|           77.17819603266939|
|    Macao, China|           5.980079296775411|
|          Sweden|            4.05458923659981|
|         Tokelau|                        null|
|        Kiribati|           32.83193697780084|
|  Macedonia, FYR|            44.5864241258091|
|          Guyana|          13.231326618356647|
|         Eritrea|           64.75510396588561|
|     Philippines|          21.822733044946958|
|        Djibouti|            41.4177919394897|
|           Tonga|          15.955612025304482|
|        Malaysia|           20.08011759051673|
|       Singapore|          10.118960985623545|
|            Fiji|          13.395933589

# PySpark ML

Binary classification: predicting whether life expectancy is above 75 years, using pyspark.ml pipeline!

By logistic regression. Our model will predict the probability of life expectancy being above 75 years.  (Similar to softmax regression I did in keras for digit recognition.)

In [8]:
df = df.withColumn("Year", df.Year.cast("integer"))
df = df.withColumn("fertility", df.fertility.cast("float"))
df = df.withColumn("life", df.life.cast("float"))
df = df.withColumn("population", df.population.cast("float"))
df = df.withColumn("child_mortality", df.child_mortality.cast("float"))
df = df.withColumn("gdp", df.gdp.cast("float"))
df.dtypes

[('Country', 'string'),
 ('Year', 'int'),
 ('fertility', 'float'),
 ('life', 'float'),
 ('population', 'float'),
 ('child_mortality', 'float'),
 ('gdp', 'float'),
 ('region', 'string')]

In [9]:
[f.dataType for f in df.schema.fields] # difference between, say, string and StringType ?

[StringType,
 IntegerType,
 FloatType,
 FloatType,
 FloatType,
 FloatType,
 FloatType,
 StringType]

In [132]:
df.count() # just checking

10111

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

reg_indexer = StringIndexer(inputCol = "region", outputCol = "region_index")

reg_encoder = OneHotEncoder(inputCol = "region_index", outputCol = "region_fact")

In [11]:
vec_assembler = VectorAssembler(inputCols=["Year", "fertility", "child_mortality", "gdp", "region_fact"], outputCol="features")

In [12]:
# Create above_75
df = df.withColumn("above_75", df.life > 75)

# Convert to an integer
df = df.withColumn("label", df.above_75.cast("integer"))

# Remove missing values
df = df.filter("fertility is not NULL and gdp is not NULL and child_mortality is not NULL")

df.count()

8836

In [13]:
from pyspark.ml import Pipeline

df_pipe = Pipeline(stages=[reg_indexer, reg_encoder, vec_assembler])

In [14]:
# Fit and transform the data
piped_data = df_pipe.fit(df).transform(df)

In [138]:
piped_data.schema.names

['Country',
 'Year',
 'fertility',
 'life',
 'population',
 'child_mortality',
 'gdp',
 'region',
 'above_75',
 'label',
 'region_index',
 'region_fact',
 'features']

In [15]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])
print(training.count())
print(test.count())

5247
3589


In [17]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression instance
lr = LogisticRegression()

# lr.explainParams()

Cross validation on training set, and hyperparameter tuning (elasticNetParam and regParam).  Elastic Net is, I believe, a combination of L^1 and L^2 regularizations.

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a BinaryClassificationEvaluator instance
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

# Another option is area under precision - recall curve
evaluator.getMetricName()

'areaUnderROC'

The ROC curve is created by plotting the true positive rate against the false positive rate at various threshold settings (i.e., various probability cutoffs).  Ideal AUC is 1.  I've seen ROC, along with confusion matrix, in scikit-learn.

In [22]:
# Import the tuning submodule
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np

# Create the parameter grid
grid = ParamGridBuilder()

# Add each hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .02, .01))
grid = grid.addGrid(lr.elasticNetParam, [0.0,1.0]) # either all L^1, when elasticNetParam = 1, or all L^2, when elasticNetParam = 0

# Build the grid
grid = grid.build()

In [23]:
# Create the CrossValidator
# numFolds = 3, by default
cv = CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator)

In [24]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [25]:
best_lr._java_obj.getRegParam()

0.01

In [26]:
best_lr._java_obj.getElasticNetParam()

1.0

In [27]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.9563231561916211


In [41]:
test_results.select("Country", "Year", "label", "probability", "prediction").filter(test_results.Year == 2010).show(50)
# "probability" is probability that life expectancy <= 75, it seems
# threshold probability seems to be 0.5

+--------------------+----+-----+--------------------+----------+
|             Country|Year|label|         probability|prediction|
+--------------------+----+-----+--------------------+----------+
|             Austria|2010|    1|[0.27816036575243...|       1.0|
|          Bangladesh|2010|    0|[0.92898875397119...|       0.0|
|            Barbados|2010|    0|[0.71042576196812...|       0.0|
|             Belarus|2010|    0|[0.57575700236623...|       0.0|
|              Belize|2010|    0|[0.91366541289979...|       0.0|
|               Benin|2010|    0|[0.99893292146753...|       0.0|
|             Bolivia|2010|    0|[0.97348423925998...|       0.0|
|              Brunei|2010|    1|[0.19527484742281...|       1.0|
|            Bulgaria|2010|    0|[0.62042812164220...|       0.0|
|            Cameroon|2010|    0|[0.99866104485070...|       0.0|
|              Canada|2010|    1|[0.34764474536552...|       1.0|
|          Cape Verde|2010|    0|[0.90004004480125...|       0.0|
|         

# CMS Physician Compare

In [42]:
df_phys = spark.read.csv('Physician_Compare_National_Downloadable_File.csv', header = True)
# No memory error or warning !!

In [43]:
df_phys.rdd.getNumPartitions()

6

In [44]:
df_phys.columns

['NPI',
 'PAC ID',
 'Professional Enrollment ID',
 'Last Name',
 'First Name',
 'Middle Name',
 'Suffix',
 'Gender',
 'Credential',
 'Medical school name',
 'Graduation year',
 'Primary specialty',
 'Secondary specialty 1',
 'Secondary specialty 2',
 'Secondary specialty 3',
 'Secondary specialty 4',
 'All secondary specialties',
 'Organization legal name',
 'Group Practice PAC ID',
 'Number of Group Practice members',
 'Line 1 Street Address',
 'Line 2 Street Address',
 'Marker of address line 2 suppression',
 'City',
 'State',
 'Zip Code',
 'Phone Number',
 'Hospital affiliation CCN 1',
 'Hospital affiliation LBN 1',
 'Hospital affiliation CCN 2',
 'Hospital affiliation LBN 2',
 'Hospital affiliation CCN 3',
 'Hospital affiliation LBN 3',
 'Hospital affiliation CCN 4',
 'Hospital affiliation LBN 4',
 'Hospital affiliation CCN 5',
 'Hospital affiliation LBN 5',
 'Professional accepts Medicare Assignment',
 'Reported Quality Measures',
 'Used electronic health records',
 'Committed to 

In [46]:
df_ind = spark.read.csv('Physician_Compare_2015_Individual_EP_Public_Reporting___Performance_Scores.csv', header = True)

In [47]:
df_ind.rdd.getNumPartitions()

4

In [51]:
df_ind.columns

['NPI',
 'PAC ID',
 'Last Name',
 'First Name',
 'Measure Identifier',
 'Measure Title',
 'Inverse Measure',
 'Measure Performance Rate',
 'Reporting Mechanism',
 'Reported on PC Live Site']

In [68]:
df_phys = df_phys.withColumnRenamed("Last Name", "LastName")
df_ind = df_ind.withColumnRenamed("Measure Title", "MeasureTitle")
df_ind = df_ind.withColumnRenamed("Measure Performance Rate", "MeasurePerformanceRate")

In [69]:
df_phys.createOrReplaceTempView('physician')
df_ind.createOrReplaceTempView('individual')

In [70]:
spark.catalog.listTables()

[Table(name='individual', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='physician', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [72]:
# *Querying multiple tables*
join_query = "SELECT physician.NPI, physician.Last_Name, individual.Measure_Title, individual.Measure_Performance_Rate FROM physician JOIN individual ON physician.NPI == individual.NPI LIMIT 10"

cms10 = spark.sql(join_query)
cms10.show()

+----------+---------+--------------------+------------------------+
|       NPI|Last_Name|       Measure_Title|Measure_Performance_Rate|
+----------+---------+--------------------+------------------------+
|1003817859|   BROWNE|Radiology: Exposu...|                     100|
|1003817859|   BROWNE|Radiology: Stenos...|                     100|
|1003817859|   BROWNE|Radiology: Exposu...|                     100|
|1003817859|   BROWNE|Radiology: Stenos...|                     100|
|1003846908|  KAIROUZ|           Care Plan|                      80|
|1003846908|  KAIROUZ|Hypertension (HTN...|                      81|
|1003846908|  KAIROUZ|           Care Plan|                      80|
|1003846908|  KAIROUZ|Hypertension (HTN...|                      81|
|1003846908|  KAIROUZ|           Care Plan|                      80|
|1003846908|  KAIROUZ|Hypertension (HTN...|                      81|
+----------+---------+--------------------+------------------------+

