#### Get the Spark Session

In [2]:
from pyspark.sql import SparkSession

# connect to Spark
spark = SparkSession\
    .builder\
    .appName("Telco Data Set")\
    .master("local[*]") \
    .getOrCreate()

#### Get the root path

In [3]:
! pwd

/home/cdsw/01_Basics


#### Read the local CSV data files (the ones downloded from GIT)

In [4]:
df_csv = spark.read \
        .option("header",True) \
        .option("sep",",") \
        .option("mode","PERMISSIVE") \
        .csv("/home/cdsw/raw/multiTimeline.csv")

#### Save Data to Parquet in ADLS/S3

In [7]:
! hadoop fs -Dfs.azure.createRemoteFileSystemDuringInitialization=true -mkdir abfs://data@mkleinazurecdpenv.dfs.core.windows.net/vionescu-demo/

Jun 26, 2020 10:18:17 AM org.apache.knox.gateway.shell.KnoxSession createClient
INFO: Using default JAAS configuration
20/06/26 10:18:18 ERROR common.DefaultRequestExecutor: Error executing request: org.apache.knox.gateway.shell.ErrorResponse: https://ps-sandbox-dl-20jun-idbroker1.ps-sandb.a465-9q4k.cloudera.site:8444//gateway/azure-cab/cab/api/v1/credentials: HTTP/1.1 403 Forbidden
^C


In [8]:
df_csv.write.parquet("abfss://data@mkleinazurecdpenv.dfs.core.windows.net/vionescu-demo/multiTimeline")

In [14]:
df_csv.write.parquet("s3a://cdp-sandbox-default-ps/datalake/vionescu-demo/multiTimeline", mode="overwrite")

#### Read Data from ADLS/S3

In [9]:
df_parquet = spark.read.parquet("abfss://data@mkleinazurecdpenv.dfs.core.windows.net/vionescu-demo/multiTimeline")

In [15]:
df_parquet = spark.read.parquet("s3a://cdp-sandbox-default-ps/datalake/vionescu-demo/multiTimeline")

In [16]:
df_parquet.printSchema()

root
 |-- Month: string (nullable = true)
 |-- Cupcake: string (nullable = true)



In [17]:
df_parquet.show(10)

+-------+-------+
|  Month|Cupcake|
+-------+-------+
|2004-01|      6|
|2004-02|      6|
|2004-03|      5|
|2004-04|      5|
|2004-05|      6|
|2004-06|      6|
|2004-07|      5|
|2004-08|      6|
|2004-09|      6|
|2004-10|     11|
+-------+-------+
only showing top 10 rows



#### Read more complex data from local CSV data file.

In [22]:
df2 = spark.read \
        .option("header",True) \
        .option("sep",",") \
        .option("mode","PERMISSIVE") \
        .csv("s3a://cdp-sandbox-default-ps/datalake/vionescu-demo/WA_Fn-UseC_-Telco-Customer-Churn-.csv")

In [23]:
df2.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [24]:
df2.show(10,False)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines   |InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|7590-VHVEG|Female|0            |Yes    |No        |1     |No          |No phone service|DSL            |No            |Yes         |No              |N

#### Pretty Print It

In [25]:
df2.limit(10).toPandas().head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


#### Write to Hive

In [None]:
spark.sql("create database vionescu_demo")

In [27]:
df2.write.saveAsTable("vionescu_demo.customer_churn")

In [28]:
df3 = spark.sql("select * from vionescu_demo.customer_churn")

In [29]:
df3.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [30]:
df3.limit(10).toPandas().head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes
