# SQL and Pandas on databricks for Data Scientists

###Explore Single Family Mortgages from the Census Tract Dataset 2010-2015

[Dataset Reference - https://www.fhfa.gov/DataTools/Downloads/Pages/Public-Use-Databases.aspx](https://www.fhfa.gov/DataTools/Downloads/Pages/Public-Use-Databases.aspx)

[Schema - https://www.fhfa.gov/DataTools/Downloads/Documents/Enterprise-PUDB/Single-Family_Census_Tract_File_/2015_Single_Family_Census_Tract_File.pdf](https://www.fhfa.gov/DataTools/Downloads/Documents/Enterprise-PUDB/Single-Family_Census_Tract_File_/2015_Single_Family_Census_Tract_File.pdf)

###1. Ingest the Dataset

In [4]:
df = sqlContext.read.format('csv').option("header", "true").option("delimiter", "\t").option("inferSchema", "true").load("/home/silvio/mortgage_data")

In [5]:
display(df.describe())

summary,fannieorfreddie,record,usps_code,msa_code,county_code,census_tract,minority_percent,census_median_income,local_median_income,tract_income_ratio,borrowers_annual_income,area_median_family_income,borrower_income_ratio,unpaid_balance,loan_purpose,federal_guarantee,number_of_borrowers,first_time_buyers,borrower_race_1,borrower_race_2,borrower_race_3,borrower_race_4,borrower_race_5,borrower_ethnicity,co_borrower_race_1,co_borrower_race_2,co_borrower_race_3,co_borrower_race_4,co_borrower_race_5,co_borrower_ethnicity,borrower_gender,co_borrower_gender,borrower_age,co_borrower_age,occupancy_code,rate_spread,hoepa_status,property_type,lien_status,year
count,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0
mean,1.0,1527826.6474815835,26.34971358038961,37417.891445221096,79.59858141608417,227233.1500790365,32.60364062413058,79648.51955186219,65299.31692772914,9.96716024317566,125496.39610362332,71266.98733175945,15.436795748739826,211409.4219233987,1.7212525776372325,1.0042655696600042,1.8592588311601048,1.9188794976571049,4.873367322343074,8.96878849628457,8.997387460436771,8.999422704421763,8.974751021335624,2.092704826780573,6.732621163311227,8.990364055392059,8.999129327555387,8.999677784466183,8.986113804460837,5.172269897897191,1.4120656806743677,2.870401523610049,103.0115942467961,506.43552271622417,1.0915901571597126,0.0307756261153226,1.9999961122923169,1.0888406194197613,1.0010139373740194,2012.304949063486
stddev,0.0,973932.0539615656,16.870274386483654,22996.78557760105,91.66069721015856,312855.4239868818,242.533562427931,41676.329294231065,30821.96112820461,295.52504671039435,383407.3056314145,24706.40240733825,371.6661442773271,119781.21084474264,0.5082840774485791,0.0904343394254452,5.440379817992817,0.5010964121864502,1.096606141136664,0.3957855836070374,0.123033522177165,0.052476000396396,0.326094090974203,0.710532873847303,2.171317353530967,0.2333814328680472,0.0705631891205686,0.0386462203270937,0.2421532048297687,3.460284076944615,0.7615375054073371,1.1408644827428915,222.19078139301308,475.4611251114248,0.2884465376789499,0.3066089878737382,0.0019717233058138,0.8207539501561228,0.031826237034804,1.5846875442585984
min,1.0,1.0,0.0,0.0,0.0,0.0,0.0,2499.0,12000.0,0.0341,0.0,16300.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,18.0,18.0,1.0,0.0,1.0,1.0,1.0,2010.0
max,1.0,3954618.0,87.0,99999.0,840.0,998900.0,9999.0,999999.0,999999.0,9999.0,22120000.0,999999.0,9999.0,1656000.0,9.0,4.0,99.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,999.0,999.0,2.0,99.99,2.0,9.0,2.0,2015.0


## Cache datasets that you will be doing frequent operations on in order to massively speed up queries.

In [7]:
df.count()

In [8]:
df.cache().count()

In [9]:
df.count()

In [10]:
display(df.describe())

summary,fannieorfreddie,record,usps_code,msa_code,county_code,census_tract,minority_percent,census_median_income,local_median_income,tract_income_ratio,borrowers_annual_income,area_median_family_income,borrower_income_ratio,unpaid_balance,loan_purpose,federal_guarantee,number_of_borrowers,first_time_buyers,borrower_race_1,borrower_race_2,borrower_race_3,borrower_race_4,borrower_race_5,borrower_ethnicity,co_borrower_race_1,co_borrower_race_2,co_borrower_race_3,co_borrower_race_4,co_borrower_race_5,co_borrower_ethnicity,borrower_gender,co_borrower_gender,borrower_age,co_borrower_age,occupancy_code,rate_spread,hoepa_status,property_type,lien_status,year
count,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0,17233806.0
mean,1.0,1527826.6474815835,26.34971358038961,37417.891445221096,79.59858141608417,227233.1500790365,32.60364062413058,79648.51955186219,65299.31692772914,9.96716024317566,125496.39610362332,71266.98733175945,15.436795748739826,211409.4219233987,1.7212525776372325,1.0042655696600042,1.8592588311601048,1.9188794976571049,4.873367322343074,8.96878849628457,8.997387460436771,8.999422704421763,8.974751021335624,2.092704826780573,6.732621163311227,8.990364055392059,8.999129327555387,8.999677784466183,8.986113804460837,5.172269897897191,1.4120656806743677,2.870401523610049,103.0115942467961,506.43552271622417,1.0915901571597126,0.0307756261153226,1.9999961122923169,1.0888406194197613,1.0010139373740194,2012.304949063486
stddev,0.0,973932.0539615656,16.870274386483654,22996.78557760105,91.66069721015856,312855.4239868818,242.533562427931,41676.329294231065,30821.96112820461,295.52504671039435,383407.3056314145,24706.40240733825,371.6661442773271,119781.21084474264,0.5082840774485791,0.0904343394254452,5.440379817992817,0.5010964121864502,1.096606141136664,0.3957855836070374,0.123033522177165,0.052476000396396,0.326094090974203,0.710532873847303,2.171317353530967,0.2333814328680472,0.0705631891205686,0.0386462203270937,0.2421532048297687,3.460284076944615,0.7615375054073371,1.1408644827428915,222.19078139301308,475.4611251114248,0.2884465376789499,0.3066089878737382,0.0019717233058138,0.8207539501561228,0.031826237034804,1.5846875442585984
min,1.0,1.0,0.0,0.0,0.0,0.0,0.0,2499.0,12000.0,0.0341,0.0,16300.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,18.0,18.0,1.0,0.0,1.0,1.0,1.0,2010.0
max,1.0,3954618.0,87.0,99999.0,840.0,998900.0,9999.0,999999.0,999999.0,9999.0,22120000.0,999999.0,9999.0,1656000.0,9.0,4.0,99.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,9.0,999.0,999.0,2.0,99.99,2.0,9.0,2.0,2015.0


###2. Explore the Dataset Using PySpark

In [12]:
df.createOrReplaceTempView("mortgages")

In [13]:
%sql select year, avg(unpaid_balance) from mortgages group by year order by year asc

year,avg(unpaid_balance)
2010,219642.78137108503
2011,208898.01220857457
2012,213157.7095436272
2013,204563.08591091988
2014,201935.24813276
2015,220038.7608634289


In [14]:
from pyspark.sql.types import *
from pyspark.sql import functions as f

display(table("mortgages").groupBy("year").agg(f.avg("unpaid_balance")).orderBy("year"))

year,avg(unpaid_balance)
2010,219642.78137108503
2011,208898.01220857457
2012,213157.7095436272
2013,204563.08591091988
2014,201935.24813276
2015,220038.7608634289


You can access spark tables either using `table` or `sql` commands in pyspark

You can make a permanent table using `dataframe.write.saveAsTable('<example-table>')`. See [tables guide](https://docs.databricks.com/user-guide/tables.html) for more info.

# Writing to pandas

running `df.toPandas()` will send all the data from the cluster to a single node. Sending the data can take a long time, and if the data is larger than a single node the task will fail with an out of memory error (OOM).

In [19]:
df = spark.table("mortgages")
display(df)

fannieorfreddie,record,usps_code,msa_code,county_code,census_tract,minority_percent,census_median_income,local_median_income,tract_income_ratio,borrowers_annual_income,area_median_family_income,borrower_income_ratio,unpaid_balance,loan_purpose,federal_guarantee,number_of_borrowers,first_time_buyers,borrower_race_1,borrower_race_2,borrower_race_3,borrower_race_4,borrower_race_5,borrower_ethnicity,co_borrower_race_1,co_borrower_race_2,co_borrower_race_3,co_borrower_race_4,co_borrower_race_5,co_borrower_ethnicity,borrower_gender,co_borrower_gender,borrower_age,co_borrower_age,occupancy_code,rate_spread,hoepa_status,property_type,lien_status,year
1,2581112,6,41940,85,508507,79.71,92600,99794,0.9279,92000,105000,0.8762,335000,2,1,1,2,2,9,9,9,9,2,9,9,9,9,9,9,2,4,53,999,1,0.0,2,1,1,2012
1,2581113,55,33340,89,630202,8.42,72356,68787,1.0519,67000,73200,0.9153,185000,2,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,31,27,1,0.0,2,1,1,2012
1,2581114,55,33340,131,420104,4.45,71458,68787,1.0388,33000,73200,0.4508,177000,2,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,62,50,1,0.0,2,1,1,2012
1,2581115,55,11540,87,12901,3.48,76875,69972,1.0987,87000,73600,1.1821,154000,2,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,43,48,1,0.0,2,1,1,2012
1,2581116,37,49180,67,3201,20.84,70298,57577,1.2209,191000,62000,3.0806,201000,1,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,32,28,1,0.0,2,1,1,2012
1,2581117,48,26420,201,412600,21.78,182540,64179,2.8442,201000,66900,3.0045,396000,2,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,41,40,1,0.0,2,1,1,2012
1,2581118,12,45300,103,26102,16.85,52578,57333,0.9171,138000,56400,2.4468,299000,2,1,2,2,6,9,9,9,9,3,6,9,9,9,9,3,3,3,53,59,1,0.0,2,1,1,2012
1,2581119,25,14460,25,110401,68.68,40722,87819,0.4637,41000,92900,0.4413,115000,2,1,1,2,3,9,9,9,9,2,9,9,9,9,9,9,2,4,62,999,1,0.0,2,1,1,2012
1,2581120,49,41620,35,103600,7.79,102614,68010,1.5088,57000,71300,0.7994,115000,2,1,2,2,5,9,9,9,9,2,5,9,9,9,9,2,1,2,61,50,1,0.0,2,1,1,2012
1,2581121,6,31100,37,226700,99.48,27217,66502,0.4093,63000,70100,0.8987,340000,2,1,2,2,5,9,9,9,9,1,5,9,9,9,9,1,1,1,61,59,1,0.0,2,1,1,2012


In [20]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

#pandas_df = df.toPandas() # This causes an OOM 
pandas_df = spark.sql('SELECT * FROM mortgages LIMIT 1000000').toPandas()

In [21]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

pandas_df = spark.sql('SELECT * FROM mortgages').toPandas() # We just wrote out 17x the data in less than half the time!

### Apache arrow deep dive

In [23]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count
from pyspark.sql.functions import pandas_udf

In [24]:
df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 1000).cast('integer')).withColumn('v', rand())

df.cache()
df.count()

In [25]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

%timeit -n1 -r1 df.toPandas()

In [26]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

%timeit -n1 -r1 df.toPandas()

## Pandas Vectorized UDFs

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

### Plus one

### Pyspark UDF

In [30]:
@udf("double")
def plus_one(v):
    return v + 1

%timeit -n1 -r1 df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()

### Pandas vectorized UDF

In [32]:
@pandas_udf('double')
def vectorized_plus_one(v):
    return v + 1

%timeit -n1 -r1 df.select('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()

## Calculating CDF

#### PySpark UDF

In [35]:
import pandas as pd
from scipy import stats

@udf('double')
def cdf(v):
    return float(stats.norm.cdf(v))

%timeit -n1 -r1 df.withColumn('cumulative_probability', cdf(df.v)).agg(count(col('cumulative_probability'))).show()

### Pandas Vectorized UDF

In [37]:
import pandas as pd
from scipy import stats

@pandas_udf('double')
def vectorized_cdf(v):
    return pd.Series(stats.norm.cdf(v))


%timeit -n1 -r1 df.withColumn('cumulative_probability', vectorized_cdf(df.v)).agg(count(col('cumulative_probability'))).show()

### Computing Subtract mean

### PySpark UDF

In [40]:
from pyspark.sql import Row
@udf(ArrayType(df.schema))
def substract_mean(rows):
    vs = pd.Series([r.v for r in rows])
    vs = vs - vs.mean()
    return [Row(id=rows[i]['id'], v=float(vs[i])) for i in range(len(rows))]
  
%timeit -n1 -r1 df.groupby('id').agg(collect_list(struct(df['id'], df['v'])).alias('rows')).withColumn('new_rows', substract_mean(col('rows'))).withColumn('new_row', explode(col('new_rows'))).withColumn('id', col('new_row.id')).withColumn('v', col('new_row.v')).agg(count(col('v'))).show()

### Vectorized UDF

In [42]:
from pyspark.sql.functions import PandasUDFType

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def vectorized_subtract_mean(pdf):
	return pdf.assign(v=pdf.v - pdf.v.mean())

%timeit -n1 -r1 df.groupby('id').apply(vectorized_subtract_mean).agg(count(col('v'))).show()

# 4. Apply ML to the dataset

* [spark-scikit learn](https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-apache-spark.html) for distributed hyperparameter search.
* [Horovod](https://docs.databricks.com/applications/deep-learning/distributed-deep-learning/index.html) for distributed deep learning.