In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession.builder \
    .appName("itv023333") \
    .master("yarn") \
    .config("spark.ui.port", "0") \
    .config('spark.shuffle.useOldFetchProtocol', 'true') \
    .config("spark.sql.warehouse.dir", f"/user/{username}/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

In [2]:
spark

In [3]:
from pyspark.sql.types import *
schema = StructType([
    StructField("user_id", LongType()),
    StructField("user_first_name", StringType()),
    StructField("user_last_name", StringType()),
    StructField("user_email", StringType()),
    StructField("user_gender", StringType()),
    StructField("user_phone_numbers", ArrayType(StringType())),
    StructField("user_address",
        StructType([
        StructField("street", StringType()),
        StructField("city", StringType()),
        StructField("state", StringType()),
        StructField("postal_code", StringType())
    ]))
])

In [4]:
base_df = spark.read \
.format("json") \
.schema(schema) \
.load("/public/sms/users")

In [5]:
base_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- user_first_name: string (nullable = true)
 |-- user_last_name: string (nullable = true)
 |-- user_email: string (nullable = true)
 |-- user_gender: string (nullable = true)
 |-- user_phone_numbers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_address: struct (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- postal_code: string (nullable = true)



In [6]:
from pyspark.sql.functions import *

base_df.withColumn("street",col("user_address.street")) \
.withColumn("city",col("user_address.city")) \
.withColumn("state",col("user_address.state")) \
.withColumn("postal_code",col("user_address.postal_code")) \
.withColumn("user_phone_numbers", size(col("user_phone_numbers"))).createOrReplaceTempView("users_vw")

In [7]:
base_df.rdd.getNumPartitions()

3

a. total number of records in the dataframe

In [8]:
base_df.count()

1000000

In [9]:
spark.sql("select * from users_vw limit 5")

user_id,user_first_name,user_last_name,user_email,user_gender,user_phone_numbers,user_address,street,city,state,postal_code
700001,North,Broxholme,nbroxholme0@apach...,Male,3,{81 Riverside Par...,81 Riverside Parkway,Newark,New Jersey,7188
700002,Allan,Burree,aburree1@gov.uk,Male,3,{109 Marquette St...,109 Marquette Street,Tampa,Florida,33625
700003,Marga,Hertwell,mhertwell2@flavor...,Female,4,"{79158 Acker Way,...",79158 Acker Way,Albuquerque,New Mexico,87105
700004,Fran,Snalum,fsnalum3@tuttocit...,Female,5,{0 Lillian Parkwa...,0 Lillian Parkway,Scranton,Pennsylvania,18514
700005,Tarrah,Asty,tasty4@netlog.com,Female,2,{60 Butternut Par...,60 Butternut Park,San Antonio,Texas,78245


b. how many users are from the state New York

In [10]:
spark.sql("select count(distinct user_id) as distinct_users from users_vw where state = 'New York' ")

distinct_users
49576


In [11]:
spark.sql("select * from users_vw where city = 'Dallas' LIMIT 10 ")

user_id,user_first_name,user_last_name,user_email,user_gender,user_phone_numbers,user_address,street,city,state,postal_code
200001,Eirena,Cutsforth,ecutsforth0@wisc.edu,Female,4,"{8 Warrior Drive,...",8 Warrior Drive,Dallas,Texas,75358
200027,Amalee,Cicculini,acicculiniq@frien...,Female,2,{59564 Weeping Bi...,59564 Weeping Bir...,Dallas,Texas,75372
200032,Bancroft,O'Mullally,bomullallyv@tiny.cc,Male,4,{1087 Quincy Cour...,1087 Quincy Court,Dallas,Texas,75205
200125,Denver,De Bruijne,ddebruijne3g@yaho...,Male,3,{1201 Acker Terra...,1201 Acker Terrace,Dallas,Texas,75251
200127,Tab,Lancastle,tlancastle3i@com.com,Male,2,"{20 Elka Road, Da...",20 Elka Road,Dallas,Texas,75310
200142,Candi,Glencrash,cglencrash3x@thea...,Female,1,{15 Burning Wood ...,15 Burning Wood S...,Dallas,Texas,75387
200151,Meriel,Doll,mdoll46@hud.gov,Female,4,{834 Redwing Cour...,834 Redwing Court,Dallas,Texas,75210
200160,Timmy,Georgot,tgeorgot4f@latime...,Female,1,{602 Summit Court...,602 Summit Court,Dallas,Texas,75260
200182,Sayre,Larimer,slarimer51@vkonta...,Male,3,"{5 Loeprich Pass,...",5 Loeprich Pass,Dallas,Texas,75358
200290,Alyosha,Cudbertson,acudbertson81@for...,Male,1,{6168 Summerview ...,6168 Summerview Park,Dallas,Texas,75397


c. which state has maximum number of postal codes

In [12]:
spark.sql("select count(distinct postal_code) as max_postcode, state from users_vw group by state order by max_postcode desc limit 1")

max_postcode,state
206,California


d. which city has the most number of users


In [13]:
spark.sql("select city, count(distinct user_id) as no_distinct_users from users_vw where city is not null group by city order by no_distinct_users desc limit 1  ")

city,no_distinct_users
Washington,28504


e. how many users have email domain as bizjournals.com

In [14]:
spark.sql("""
select count(user_id) as users from users_vw where user_email like '%bizjournals.com'

""")

users
2015


f. how many users have 4 phone numbers mentioned


In [15]:
spark.sql("""
select count(distinct user_id) no_of_users from users_vw where user_phone_numbers ='4'
""")

no_of_users
179041


g. how many users do not have any phone number mentioned

In [16]:
spark.sql("""
select count(distinct user_id ) as no_of_users 
from users_vw 
where user_phone_numbers is null
""")

no_of_users
0


Question 3 \
Write the data from the base dataframe as it is to the disk, but write in parquet
format. \
Observe the number of files created, also the size of files.

In [18]:
base_df.write \
.option("path","/user/itv023333/week9/assignment") \
.mode("overwrite") \
.save()

Question 5 \
Create a pivot where states should be the rows and user_gender should be
the columns. \
Something like below, the aggregation is based on the number of records
under each category where the phone number is not null.

In [19]:
spark.sql("""select state,
sum(Male_cnt) as Male,
sum(Female_cnt) as Female
from
(select state,
case when user_gender='Male' then count(user_id) end as Male_cnt,
case when user_gender='Female' then count(user_id) end as
Female_cnt
from users_vw
where state is not null and user_phone_numbers is not null
group by state,user_gender)
group by state
order by state""")

state,Male,Female
Alabama,9307,9178
Alaska,1882,1938
Arizona,9406,9543
Arkansas,2420,2416
California,49120,48716
Colorado,10128,10125
Connecticut,5797,5917
Delaware,1651,1654
District of Columbia,14212,14292
Florida,36692,36688


In [20]:
airline_df = spark.read \
.format("csv") \
.load("/public/airlines_all/airlines/*")

In [21]:
airline_df.rdd.getNumPartitions()

1919

If we add below configuration to spark session \
it will increase the block size from 128 mb to 140 mb \
if we combine two files it will be more than 128 mb so mergining files is not happing here \ 
thats why we are getting 1919 partitions cuz that many files are present \
but with below configuration block size will be 140 mb and spark can merge 2 files \
and number of block size will be increased so partitions will be reduced to half 

In [22]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession.builder \
    .appName("itv023333") \
    .master("yarn") \
    .config("spark.ui.port", "0") \
    .config('spark.shuffle.useOldFetchProtocol', 'true') \
    .config("spark.sql.files.maxPartitionBytes", "146800640") \
    .config("spark.sql.warehouse.dir", f"/user/{username}/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

In [23]:
airline_df = spark.read \
.format("csv") \
.load("/public/airlines_all/airlines/*")

In [24]:
airline_df.rdd.getNumPartitions()

960