In [181]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [182]:
# spark is an existing SparkSession
df = spark.read.json("/Users/sravanthi/Documents/Sandbox/spark-2.4.4-bin-hadoop2.7/python/test_support/sql/people.json")


In [183]:
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [184]:
# df.printSchema()

from pyspark.sql.functions import when   

df = df.withColumn('Name', 
                     F.when(df['name'] == "Michael", "MICHAEL")
                     .when(df['name'] == "Andy", "ANDY")
                     .otherwise(0))
# newdf = df.withColumn('name', F.when(df['name'] == "Andy", "ANDY"))
# newdf = df.withColumn('name', F.when(df['name'] == "Justin", "JUSTIN"))

In [185]:
df.rdd.getNumPartitions()

1

In [186]:
df.show()

+----+-------+
| age|   Name|
+----+-------+
|null|MICHAEL|
|  30|   ANDY|
|  19|      0|
+----+-------+



In [9]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [10]:
df.count()

3

In [11]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [12]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [13]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [14]:
# Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [15]:
#Running SQL Queries Programmatically

In [16]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [33]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("/Users/sravanthi/Documents/Sandbox/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
print(people)
# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Michael, 29
# Andy, 30
# Justin, 19


PythonRDD[113] at RDD at PythonRDD.scala:53


In [30]:
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT * FROM people")

results.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



### Excercise - Mental Health Tech Survey :)

In [187]:
surveyDf = spark.read.options(header="true",\
                              inferSchema="true",\
                              nullValue="NA",\
                              timestampFormat="yyyy-MM-dd'T'HH:mm:ss",\
                              mode="failfast")\
                             .csv("/Users/sravanthi/Documents/Sandbox/spark/survey.csv")

In [188]:
surveyDf.count()

1259

In [189]:
surveyDf.summary()

DataFrame[summary: string, Timestamp: string, Age: string, Gender: string, Country: string, state: string, self_employed: string, family_history: string, treatment: string, work_interfere: string, no_employees: string, remote_work: string, tech_company: string, benefits: string, care_options: string, wellness_program: string, seek_help: string, anonymity: string, leave: string, mental_health_consequence: string, phys_health_consequence: string, coworkers: string, supervisor: string, mental_health_interview: string, phys_health_interview: string, mental_vs_physical: string, obs_consequence: string, comments: string]

> 1. Calculate and display the count of people (gender wise) who have taken treatment for mental health condition.
> 2. What is the average age of participants (gender wise) from the country "Canada"?
> 3. What is the youngest age of participant from United Kingdom?
> 4. List the count of participants who work for tech company , country wise?
> 5. List the top 10 participants by age who had family history of mental health issues and had taken the treament.

In [190]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when   

In [191]:
surveyDf = surveyDf.withColumn("Gender",
                                F.when(surveyDf["Gender"] == "Cis Female", "Female")
                                .when(surveyDf["Gender"] == "cis-female/femme", "Female")
                                .when(surveyDf["Gender"] == "Woman", "Female")
                                .when(surveyDf["Gender"] == "F", "Female")
                                .when(surveyDf["Gender"] == "Femake", "Female")
                                .when(surveyDf["Gender"] == "femail", "Female")
                                .when(surveyDf["Gender"] == "Female (cis)", "Female")
                                .when(surveyDf["Gender"] == "female", "Female")
                                .when(surveyDf["Gender"] == "Cis Male", "Male")
                                .when(surveyDf["Gender"] == "Cis Man", "Male")
                                .when(surveyDf["Gender"] == "Guy (-ish) ^_^", "Male")
                                .when(surveyDf["Gender"] == "M", "Male")
                                .when(surveyDf["Gender"] == "Mail", "Male")
                                .when(surveyDf["Gender"] == "maile", "Male")
                                .when(surveyDf["Gender"] == "Make", "Male")
                                .when(surveyDf["Gender"] == "Mal", "Male")
                                .when(surveyDf["Gender"] == "Male (CIS)", "Male")
                                .when(surveyDf["Gender"] == "Male-ish", "Male")
                                .when(surveyDf["Gender"] == "Malr", "Male")
                                .when(surveyDf["Gender"] == "Man", "Male")
                                .when(surveyDf["Gender"] == "msle", "Male")
                                .when(surveyDf["Gender"] == "Male", "Male")
                                .when(surveyDf["Gender"] == "male", "Male")
                                .otherwise("No gender")
                                
                                )
          

In [220]:
# Register the DataFrame as a SQL temporary view
surveyDf.createOrReplaceTempView("surveydf")
# spark.sql("select unique(gender) from surveydf")
sqlsurveyDf = spark.sql('select gender,count(gender) from surveyDf where mental_health_consequence == "Yes" and treatment = "Yes" group by gender ')
sqlsurveyDf.show()
                        

+---------+-------------+
|   gender|count(gender)|
+---------+-------------+
|No gender|           35|
|   Female|           24|
|     Male|          115|
+---------+-------------+



In [218]:
# What is the average age of participants (gender wise) from the country "Canada"?

sqlsurveyDf1 = spark.sql('select gender, Avg(Age) as avg_age from surveyDf where Country == "Canada" group by gender ')
sqlsurveyDf1.show()


+---------+------------------+
|   gender|           avg_age|
+---------+------------------+
|No gender|             30.25|
|   Female|              28.6|
|     Male|29.181818181818183|
+---------+------------------+



In [210]:
# What is the youngest age of participant from United Kingdom?

sqlsurveyDf2 = spark.sql('select Country,Age from surveyDf where Country == "United Kingdom" and Age > 0 order by Age limit 1 ')
sqlsurveyDf2.show()

+--------------+---+
|       Country|Age|
+--------------+---+
|United Kingdom| 18|
+--------------+---+



In [217]:
# List the count of participants who work for tech company , country wise?

sqlsurveyDf3 = spark.sql('select country, count(*) as Count_participants  from surveyDf where tech_company == "Yes" group by country')
sqlsurveyDf3.show()

+-------------+------------------+
|      country|Count_participants|
+-------------+------------------+
|       Russia|                 3|
|       Sweden|                 7|
|  Philippines|                 1|
|    Singapore|                 4|
|      Germany|                40|
|       France|                12|
|       Greece|                 2|
|      Belgium|                 3|
|      Finland|                 2|
|United States|               611|
|        India|                 9|
|        China|                 1|
|      Croatia|                 2|
|      Nigeria|                 1|
|        Italy|                 5|
|       Norway|                 1|
|        Spain|                 1|
|      Denmark|                 2|
|      Ireland|                26|
|     Thailand|                 1|
+-------------+------------------+
only showing top 20 rows



In [224]:
# List the top 10 participants by age who had family history of mental health issues and had taken the treament.
 
sqlsurveyDf3 = spark.sql('select age from surveyDf where family_history == "Yes" and mental_health_consequence == "Yes" and treatment = "Yes" order by age ')
sqlsurveyDf3.show()


+---+
|age|
+---+
| -1|
|  8|
| 21|
| 22|
| 22|
| 23|
| 23|
| 24|
| 24|
| 24|
| 24|
| 25|
| 25|
| 25|
| 25|
| 25|
| 25|
| 25|
| 25|
| 26|
+---+
only showing top 20 rows

