In [20]:
import pyspark

In [21]:
from pyspark.sql import SparkSession

In [22]:
spark = SparkSession.builder.appName('Basic').getOrCreate()

In [23]:
df = spark.read.csv("/home/maq/Datasets/customer_churn.csv")

In [24]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [25]:
df = spark.read.csv("/home/maq/Datasets/customer_churn.csv", header = True)
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: string (nullable = true)



In [26]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [27]:
df.describe()

                                                                                

DataFrame[summary: string, Names: string, Age: string, Total_Purchase: string, Account_Manager: string, Years: string, Num_Sites: string, Onboard_date: string, Location: string, Company: string, Churn: string]

In [28]:
from pyspark.sql.types import (StructField, 
                               StringType,
                               IntegerType,
                               FloatType, 
                               DoubleType,
                               DecimalType,
                               BooleanType,
                               StructType)

In [29]:
def schemaBuilder(columns):                
    data_schema = []
    for i in columns:
        whatis = input(i + " is int, float, double, decimal, bool, string :")
        if (whatis == 'float'):
            dataType = FloatType()
        elif (whatis == 'bool'):
            dataType = BooleanType()
        elif (whatis == 'double'):
            dataType = DoubleType()
        elif (whatis == 'int'):
            dataType = IntegerType()
        elif (whatis == 'decimal'):
            dataType = DecimalType()
        else:
            dataType = StringType()
        
        data_schema.append(StructField(i, dataType, True))
        
    return data_schema

In [30]:
data_schema = schemaBuilder(df.columns)

Names is int, float, double, decimal, bool, string :string
Age is int, float, double, decimal, bool, string :string 
Total_Purchase is int, float, double, decimal, bool, string :
Account_Manager is int, float, double, decimal, bool, string :
Years is int, float, double, decimal, bool, string :
Num_Sites is int, float, double, decimal, bool, string :
Onboard_date is int, float, double, decimal, bool, string :
Location is int, float, double, decimal, bool, string :
Company is int, float, double, decimal, bool, string :
Churn is int, float, double, decimal, bool, string :


In [31]:
final_struct = StructType(fields = data_schema)

# df = spark.read.json('persons.json', schema = final_struct)

In [32]:
df = spark.read.csv("/home/maq/Datasets/customer_churn.csv", header = True, schema = final_struct)
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: string (nullable = true)



In [33]:
type(df['age'])

pyspark.sql.column.Column

In [34]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [35]:
type(df.head(2))

list

In [36]:
df.head(2)

[Row(Names='Cameron Williams', Age='42.0', Total_Purchase='11066.8', Account_Manager='0', Years='7.22', Num_Sites='8.0', Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission', Company=None, Churn=None),
 Row(Names='Barkerburgh', Age=' AK 89518"', Total_Purchase='Harvey LLC', Account_Manager='1', Years=None, Num_Sites=None, Onboard_date=None, Location=None, Company=None, Churn=None)]

In [37]:
df.head(2)[1]

Row(Names='Barkerburgh', Age=' AK 89518"', Total_Purchase='Harvey LLC', Account_Manager='1', Years=None, Num_Sites=None, Onboard_date=None, Location=None, Company=None, Churn=None)

In [40]:
df.select(['Names', 'age']).show()

+-------------------+---------------+
|              Names|            age|
+-------------------+---------------+
|   Cameron Williams|           42.0|
|        Barkerburgh|      AK 89518"|
|      Kevin Mueller|           41.0|
|        Carloshaven|      RI 17756"|
|        Eric Lozano|           38.0|
|        Alyssahaven|      DE 90114"|
|      Phillip White|           42.0|
|         Angelabury| WY 30645-4695"|
|     Cynthia Norton|           37.0|
|         Karenshire|      MH 71730"|
|   Jessica Williams|           48.0|
|East Vincentborough|      PR 74359"|
|        Eric Butler|           44.0|
|        West Justin| IA 87713-3460"|
|      Zachary Walsh|           32.0|
|          Brownport| FM 59852-6150"|
|        Ashlee Carr|           43.0|
|South Christineview|      MA 82059"|
|     Jennifer Lynch|           40.0|
|          South Ann| WI 51655-7561"|
+-------------------+---------------+
only showing top 20 rows



In [45]:
df.withColumn('newcolumn', df['age']+1).printSchema()
# Not at inplace operation

root
 |-- Names: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- newcolumn: double (nullable = true)



In [46]:
df.withColumnRenamed('Names', 'name').printSchema()

root
 |-- name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Total_Purchase: string (nullable = true)
 |-- Account_Manager: string (nullable = true)
 |-- Years: string (nullable = true)
 |-- Num_Sites: string (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: string (nullable = true)



In [47]:
df.createOrReplaceTempView('people')

In [48]:
results = spark.sql('SELECT * FROM people')

In [49]:
results

DataFrame[Names: string, Age: string, Total_Purchase: string, Account_Manager: string, Years: string, Num_Sites: string, Onboard_date: string, Location: string, Company: string, Churn: string]

In [56]:
results.groupby('Account_Manager').count().show()

+---------------+-----+
|Account_Manager|count|
+---------------+-----+
|              0| 1128|
|              1|  565|
|           null|  107|
+---------------+-----+



In [57]:
results = spark.sql('SELECT * FROM people where Account_Manager=1')

In [59]:
results.groupby('Account_Manager').count().show()

+---------------+-----+
|Account_Manager|count|
+---------------+-----+
|              1|  565|
+---------------+-----+

