In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-1")\
    .config("spark.yarn.access.hadoopFileSystems","s3a://demo-aws-1/")\
    .config("spark.hadoop.yarn.resourcemanager.principal",os.getenv("HADOOP_USER_NAME"))\
    .config("spark.executor.instances", 4)\
    .config("spark.executor.cores", 4)\
    .getOrCreate()
#.config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\

In [3]:
df = spark.read.option('inferschema','true').csv(
  "s3a://demo-aws-1/user/pauldefusco/LoanStats_2015_subset.csv",
  header=True,
  sep=',',
  nullValue='NA'
)

In [4]:
#Printing number of rows and columns:
print('Dataframe Shape')
print((df.count(), len(df.columns)))

Dataframe Shape
(421097, 105)


In [5]:
#Count number of nulls for each column:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,acc_now_delinq,acc_open_past_24mths,addr_state,all_util,annual_inc,annual_inc_joint,application_type,avg_cur_bal,bc_open_to_buy,bc_util,...,sec_app_earliest_cr_line,sec_app_inq_last_6mths,sec_app_mort_acc,sec_app_open_acc,sec_app_revol_util,sec_app_open_act_il,sec_app_num_rev_accts,sec_app_chargeoff_within_12_mths,sec_app_collections_12_mths_ex_med,sec_app_mths_since_last_major_derog
0,3,3,1,399725,3,420586,1,3,3966,4230,...,421097,421097,421097,421097,421097,421097,421097,421097,421097,421097


Register Spark DF as Spark SQL Table

In [6]:
spark.sql("show databases").show()

+--------------------+
|        databaseName|
+--------------------+
|  airline_ontime_orc|
|airline_ontime_pa...|
|             default|
|  information_schema|
|       prescribing_o|
|       prescribing_p|
|     prescribing_p_e|
|          retaildemo|
|         shlomi_test|
|                 sys|
|                test|
|             test_dl|
|           usecase01|
+--------------------+



In [7]:
df.write.format('parquet').mode("overwrite").saveAsTable('default.ml_ops_demo_new')

In [8]:
new_df = spark.sql("SELECT * FROM default.ml_ops_demo_new LIMIT 10")

In [9]:
new_df.write.format('parquet').mode("overwrite").saveAsTable('default.ml_ops_demo_new_limit')

In [11]:
#pd.read_csv('data/x.csv')

In [12]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|             cc_data|      false|
| default|               depts|      false|
| default|            dex_test|      false|
| default|                emps|      false|
| default|     lc_simpl_scores|      false|
| default|            lc_smote|      false|
| default|   lc_smote_complete|      false|
| default|         lc_smote_k2|      false|
| default|         lc_smote_k3|      false|
| default|            lc_table|      false|
| default|               micro|      false|
| default|         ml_ops_demo|      false|
| default|   ml_ops_demo_limit|      false|
| default|     ml_ops_demo_new|      false|
| default|ml_ops_demo_new_l...|      false|
| default|         telco_churn|      false|
| default|             testcml|      false|
| default|            test_orc|      false|
+--------+--------------------+-----------+

