In [1]:
# import dependencies
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

In [2]:
## create spark session
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"

spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()



In [3]:
# read data source
json_file_path = './demographic_info.json'
dataframe = spark.read.json(json_file_path)
print("Rows Read: ",dataframe.count())

Rows Read:  55


In [4]:
#using select query
df = dataframe.select("name", "age", "gender", "isActive", "balance", "company", "eyeColor", "email", "phone")
print("Printing Selected Coulmns: \n")
df.show()

Printing Selected Coulmns: 

+-----------------+---+------+--------+---------+---------+--------+--------------------+-----------------+
|             name|age|gender|isActive|  balance|  company|eyeColor|               email|            phone|
+-----------------+---+------+--------+---------+---------+--------+--------------------+-----------------+
|      Lowe Larson| 42|  male|    true|$2,411.24| RENOVIZE|   brown|lowelarson@renovi...|+1 (829) 550-2765|
| Daugherty Wooten| 16|  male|    true|$2,407.28|ACRODANCE|   green|daughertywooten@a...|+1 (802) 479-3544|
| Patterson Hooper| 28|  male|   false|$3,350.58| EPLOSION|   green|pattersonhooper@e...|+1 (939) 419-2625|
|      Hood Ortega| 53|  male|    true|$2,147.34|  GRACKER|    blue|hoodortega@gracke...|+1 (880) 575-3417|
|   Fuller Morales| 58|  male|    true|$1,851.82|MANGELICA|   brown|fullermorales@man...|+1 (958) 597-3314|
|     Lela Griffin| 55|female|   false|$1,374.50|   BIFLEX|    blue|lelagriffin@bifle...|+1 (808) 409-2628|

In [5]:
#using filter
df = df.filter(df.isActive == "true")
print("Filter with isActive: \n")
df.show()

Filter with isActive: 

+----------------+---+------+--------+---------+---------+--------+--------------------+-----------------+
|            name|age|gender|isActive|  balance|  company|eyeColor|               email|            phone|
+----------------+---+------+--------+---------+---------+--------+--------------------+-----------------+
|     Lowe Larson| 42|  male|    true|$2,411.24| RENOVIZE|   brown|lowelarson@renovi...|+1 (829) 550-2765|
|Daugherty Wooten| 16|  male|    true|$2,407.28|ACRODANCE|   green|daughertywooten@a...|+1 (802) 479-3544|
|     Hood Ortega| 53|  male|    true|$2,147.34|  GRACKER|    blue|hoodortega@gracke...|+1 (880) 575-3417|
|  Fuller Morales| 58|  male|    true|$1,851.82|MANGELICA|   brown|fullermorales@man...|+1 (958) 597-3314|
|    Harmon Giles| 34|  male|    true|$3,294.72|   VETRON|   green|harmongiles@vetro...|+1 (957) 417-3607|
|   Fisher Mathis| 45|  male|    true|$1,546.40|   ZAPPIX|    blue|fishermathis@zapp...|+1 (863) 435-3679|
| Sharlene Be

In [6]:
#Adding new column Age_Group
df = df.withColumn("Age_Group", when((df.age >= 13) & (df.age < 20), lit("Teenager")) \
      .when((df.age >= 20) & (df.age < 40), lit("Adult")) \
      .when(df.age >= 40, lit("Old")) \
      .otherwise(lit("NA")))
print("Output with new column Age_Group")
df.show(10, False)

Output with new column Age_Group
+----------------+---+------+--------+---------+---------+--------+-----------------------------+-----------------+---------+
|name            |age|gender|isActive|balance  |company  |eyeColor|email                        |phone            |Age_Group|
+----------------+---+------+--------+---------+---------+--------+-----------------------------+-----------------+---------+
|Lowe Larson     |42 |male  |true    |$2,411.24|RENOVIZE |brown   |lowelarson@renovize.com      |+1 (829) 550-2765|Old      |
|Daugherty Wooten|16 |male  |true    |$2,407.28|ACRODANCE|green   |daughertywooten@acrodance.com|+1 (802) 479-3544|Teenager |
|Hood Ortega     |53 |male  |true    |$2,147.34|GRACKER  |blue    |hoodortega@gracker.com       |+1 (880) 575-3417|Old      |
|Fuller Morales  |58 |male  |true    |$1,851.82|MANGELICA|brown   |fullermorales@mangelica.com  |+1 (958) 597-3314|Old      |
|Harmon Giles    |34 |male  |true    |$3,294.72|VETRON   |green   |harmongiles@vetron

In [7]:
df.createOrReplaceTempView("EMP")
spark.sql("select name, age, gender, isActive, balance, company, eyeColor, email, phone from "+
     " (select *, row_number() OVER (PARTITION BY gender ORDER BY balance DESC) as rn " +
     " FROM EMP) tmp where rn = 1").show()


+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+
|         name|age|gender|isActive|  balance|company|eyeColor|               email|            phone|
+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+
|   Naomi Kemp| 17|female|    true|$3,601.90|EXOBLUE|   green|naomikemp@exoblue...|+1 (965) 465-2006|
|Griffin Carey| 47|  male|    true|$3,330.34| OBONES|   green|griffincarey@obon...|+1 (869) 531-3076|
+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+



In [8]:
#maximun 
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w = Window.partitionBy("gender").orderBy(col("balance").desc())
print("Top Male & Female on the basis of Balance")
df.withColumn("row",row_number().over(w)) \
  .filter(col("row") == 1).drop("row") \
  .show()


+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+
|         name|age|gender|isActive|  balance|company|eyeColor|               email|            phone|Age_Group|
+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+
|   Naomi Kemp| 17|female|    true|$3,601.90|EXOBLUE|   green|naomikemp@exoblue...|+1 (965) 465-2006| Teenager|
|Griffin Carey| 47|  male|    true|$3,330.34| OBONES|   green|griffincarey@obon...|+1 (869) 531-3076|      Old|
+-------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+



In [9]:
#second maximun 
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w = Window.partitionBy("gender").orderBy(col("balance").desc())
print("Top Second Male & Female on the basis of Balance")
df.withColumn("row",row_number().over(w)) \
  .filter(col("row") == 2).drop("row") \
  .show()


Top Second Male & Female on the basis of Balance
+--------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+
|          name|age|gender|isActive|  balance|company|eyeColor|               email|            phone|Age_Group|
+--------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+
|Addie Mckinney| 29|female|    true|$3,501.79|MARVANE|   brown|addiemckinney@mar...|+1 (964) 419-2240|    Adult|
|  Harmon Giles| 34|  male|    true|$3,294.72| VETRON|   green|harmongiles@vetro...|+1 (957) 417-3607|    Adult|
+--------------+---+------+--------+---------+-------+--------+--------------------+-----------------+---------+



In [10]:
#create temp view table
dataframe.createTempView("sample_data_view")
spark.sql('''
    SELECT _id, name, age, gender, isActive, balance, company, eyeColor, email, phone
    FROM sample_data_view
    WHERE isActive == "true"
    ''').show(10, False)

+------------------------+----------------+---+------+--------+---------+---------+--------+-----------------------------+-----------------+
|_id                     |name            |age|gender|isActive|balance  |company  |eyeColor|email                        |phone            |
+------------------------+----------------+---+------+--------+---------+---------+--------+-----------------------------+-----------------+
|6189fc14820ae399fbf0e72e|Lowe Larson     |42 |male  |true    |$2,411.24|RENOVIZE |brown   |lowelarson@renovize.com      |+1 (829) 550-2765|
|6189fc149e47c242d283481f|Daugherty Wooten|16 |male  |true    |$2,407.28|ACRODANCE|green   |daughertywooten@acrodance.com|+1 (802) 479-3544|
|6189fc1443ea95f1b7cfb74e|Hood Ortega     |53 |male  |true    |$2,147.34|GRACKER  |blue    |hoodortega@gracker.com       |+1 (880) 575-3417|
|6189fc14ec90e8b6164151d5|Fuller Morales  |58 |male  |true    |$1,851.82|MANGELICA|brown   |fullermorales@mangelica.com  |+1 (958) 597-3314|
|6189fc14cc63

In [11]:
#convert to rdd
rdd_object = df.rdd
rdd_object.saveAsTextFile(f"./rdd_text_new")

In [12]:
#end spark session
spark.stop()