# PySpark - PyCon UK 2018

## PySpark Solutions

In [3]:
%fs ls dbfs:/FileStore/tables

path,name,size
dbfs:/FileStore/tables/01_pyspark_ml.ipynb,01_pyspark_ml.ipynb,19104
dbfs:/FileStore/tables/births.csv,births.csv,1908068
dbfs:/FileStore/tables/births_transformed.csv,births_transformed.csv,1908068


In [4]:
# Read all of the data from csv. The accompanying report is in the file natality.pdf. Have a quick browse through the data.
df = spark.read.csv('dbfs:/FileStore/tables/births.csv',header=True)
df.show(2)

In [5]:
# List all of the headings
df.columns

In [6]:
# How many infants were alive/dead from the report
df.groupby('INFANT_ALIVE_AT_REPORT').count().show()

In [7]:
# What is the format of the DataFrame (what data types are used for the different columns?)
df.printSchema()

In [8]:
# Display only the columns INFANT_ALIVE_AT_REPORT, MOTHER_AGE_YEARS, MOTHER_PRE_WEIGHT
df.select(['INFANT_ALIVE_AT_REPORT','MOTHER_AGE_YEARS','MOTHER_PRE_WEIGHT']).show()

In [9]:
# Show all the mothers who were less than 20 at the time of the birth
df.filter(df.MOTHER_AGE_YEARS < 20).show()

In [10]:
#Change the mother's height in inches to an integer
from pyspark.sql.types import IntegerType
df = df.withColumn('MOTHER_HEIGHT_IN',df.MOTHER_HEIGHT_IN.cast(IntegerType()))
df.printSchema()

In [11]:
#Mother's height in feet
df = df.withColumn('MOTHER_HEIGHT_FEET',df.MOTHER_HEIGHT_IN/12)
df.show(2)

In [12]:
# Count the number of mothers who smoked between 40 and 60 cigarettes (inclusive) in their third trimester
from pyspark.sql.types import *
df.withColumn('CIG_3_TRI',df.CIG_3_TRI.cast(IntegerType()))
df.filter((df.CIG_3_TRI >= 40) & ( df.CIG_3_TRI <= 60) ).count()

In [13]:
# Print out the different birth places and order by BIRTH_PLACE
df.groupby('BIRTH_PLACE').count().orderBy('BIRTH_PLACE').show()

In [14]:
# 1. The birthplace isn't particularly meaningful - now include the text description
# 2. Sort this by the counts in descending order
from pyspark.sql.functions import when
df = df.withColumn('BIRTH_PLACE_TEXT',when(df.BIRTH_PLACE == 1,'HOSPITAL') \
                   .when(df.BIRTH_PLACE == 2,'Freestanding Birth Center') \
                   .when(df.BIRTH_PLACE == 3,'Home (intended)') \
                   .when(df.BIRTH_PLACE == 4,'Home (not intended)') \
                   .when(df.BIRTH_PLACE == 5,'Home (unknown if intended)') \
                   .when(df.BIRTH_PLACE == 6,'Clinic / Doctor’s Office') \
                   .when(df.BIRTH_PLACE == 7,'Other') \
                   .otherwise('Unknown'))
df.groupby('BIRTH_PLACE_TEXT').count().orderBy('count',ascending=False).show()
df.printSchema()

### How to explicitly define the schema

In [16]:
from pyspark.sql.types import *

COL_HEADINGS = [
    ('INFANT_ALIVE_AT_REPORT', IntegerType()),
    ('BIRTH_PLACE', IntegerType()),
    ('MOTHER_AGE_YEARS', IntegerType()),
    ('FATHER_COMBINED_AGE', IntegerType()),
    ('CIG_BEFORE', IntegerType()),
    ('CIG_1_TRI', IntegerType()),
    ('CIG_2_TRI', IntegerType()),
    ('CIG_3_TRI', IntegerType()),
    ('MOTHER_HEIGHT_IN', IntegerType()),
    ('MOTHER_PRE_WEIGHT', IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', IntegerType()),
    ('MOTHER_WEIGHT_GAIN', IntegerType()),
    ('DIABETES_PRE', IntegerType()),
    ('DIABETES_GEST', IntegerType()),
    ('HYP_TENS_PRE', IntegerType()),
    ('HYP_TENS_GEST', IntegerType()),
    ('MOTHER_HEIGHT_FEET', DoubleType()),
    ('BIRTH_PLACE_TEXT', StringType())
]

schema = StructType([
    StructField(e[0], e[1], False) for e in COL_HEADINGS
])

births = spark.read.csv('dbfs:/FileStore/tables/births.csv', 
                        header=True, 
                        schema=schema)

In [17]:
df.printSchema()

### How to infer the schema

In [19]:
df = spark.read.csv('dbfs:/FileStore/tables/births.csv',header=True,inferSchema=True)
df.printSchema()