<a href="https://colab.research.google.com/github/karthikshanthp/colab/blob/main/Pyspark_readCSV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Pyspark code to read a csv file, create RDD, inspect the dataframe, filter, create new columns, create calculated columns, group by data, perform aggregations, and finally write the dataframe into different file formats like CSV, PARQUET, TXT,JSON

In [2]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=56a89d8430d1de5fce7bc582fa55d1a8517bfa80d9e4f88f32e09b3e43bcc218
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.getOrCreate()




#Make sure the original.csv file is located in the google connected google drive account before processing this code cell

In [5]:

df =  spark.read.csv('/content/drive/MyDrive/Colab Notebooks/resources/original.csv', header=True)

In [6]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [7]:
df.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [8]:
schema = StructType([
    StructField('id', IntegerType()),
    StructField('first_name', StringType()),
    StructField('last_name', StringType()),
    StructField('gender', StringType()),
    StructField('city', StringType()),
    StructField('JobTitle', StringType()),
    StructField('salary', StringType()),
    StructField('Latitude', FloatType()),
    StructField('Longitude', FloatType())])


In [9]:
df2 = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/resources/original.csv', header=True, schema=schema)

In [10]:
df2.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('city', 'string'),
 ('JobTitle', 'string'),
 ('salary', 'string'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]

In [11]:
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [12]:
df2.head(15)

[Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', city='Nowa Ruda', JobTitle='Assistant Professor', salary='$57438.18', Latitude=50.57740783691406, Longitude=16.49671745300293),
 Row(id=2, first_name='Kimberly', last_name='Von Welden', gender='Female', city='Bulgan', JobTitle='Programmer II', salary='$62846.60', Latitude=48.823158264160156, Longitude=103.52182006835938),
 Row(id=3, first_name='Alvera', last_name='Di Boldi', gender='Female', city=None, JobTitle=None, salary='$57576.52', Latitude=39.994747161865234, Longitude=116.33977508544922),
 Row(id=4, first_name='Shannon', last_name="O'Griffin", gender='Male', city='Divnomorskoye', JobTitle='Budget/Accounting Analyst II', salary='$61489.23', Latitude=44.504722595214844, Longitude=38.1300163269043),
 Row(id=5, first_name='Sherwood', last_name='Macieja', gender='Male', city='Mytishchi', JobTitle='VP Sales', salary='$63863.09', Latitude=None, Longitude=37.64899444580078),
 Row(id=6, first_name='Maris', last_name

In [13]:
df2.first()

Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', city='Nowa Ruda', JobTitle='Assistant Professor', salary='$57438.18', Latitude=50.57740783691406, Longitude=16.49671745300293)

In [14]:
#statistics about the dataframe
df2.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|summary|               id|first_name|last_name|gender|               city|           JobTitle|   salary|          Latitude|        Longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|               999|             1000|
|   mean|            500.5|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL| 25.43151724702484|43.33756460386515|
| stddev|288.8194360957494|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL|24.579082550156635| 69.4206453674681|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator|$10101.92|         -54.28115|       -123.04196|

In [15]:
df2.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'city',
 'JobTitle',
 'salary',
 'Latitude',
 'Longitude']

In [16]:
df2.count()

1000

In [17]:
df2.distinct().count()

1000

In [18]:
df_dropped_nulls = df2.na.drop()
df_dropped_nulls.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36| 45.190517| 0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Manage...|$73697.10| 32.027935| 106.65711|
| 10|      Bran|   Trahear|  Male|       Arbeláez|Mechanical Sys

In [19]:
df_filtered = df2.filter(df2.JobTitle.isNotNull())
df_filtered.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36| 45.190517| 0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Man

In [20]:
df_handled = df2.withColumn('clean_city', when(df2.city.isNull(), 'Unknown').otherwise(df2.city))
df_handled.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsea

In [21]:
df_drop_dups = df_handled.dropDuplicates()
df_drop_dups.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
|391|     Verge|  Hefferan|  Male|     Cocachacra|Community Outreac...|$90391.71|-17.091843| -71.77114|     Cocachacra|
|995|      Niki|  Ashbrook|  Male|Novozavidovskiy|    Media Manager II|$67437.88|  56.55148|  36.43472|Novozavidovskiy|
| 87|   Celisse|   Trevers|Female|     Nezlobnaya|            Operator|$20601.37| 44.120186|  43.40174|     Nezlobnaya|
|305|   Clemmie|    Gaddas|  Male|    Vila Viçosa|Administrative As...|$44675.97| 38.775417|  -7.41612|    Vila Viçosa|
|356|   Clement|  Goodrick|  Male|         Žrnovo|Software Engineer II|$74782.64|   42.9239| 17.113007|         Žrnovo|
|401|      Roma| O'Shevlan|  Male|      

In [22]:
#bitwise operators AND , OR using multiple filters
df_mult_filter = df_handled.filter((df_handled.first_name.isin('Aldin', 'Valma')) & (df_handled.city.like('%ondon')))
# df_mult_filter = df_handled.filter((df_handled.first_name.isin('Aldin', 'Valma')) | (df_handled.city.like('%ondon')))
df_mult_filter.show()

+---+----------+-------------+------+-----------+--------+---------+----------+---------+-----------+
| id|first_name|    last_name|gender|       city|JobTitle|   salary|  Latitude|Longitude| clean_city|
+---+----------+-------------+------+-----------+--------+---------+----------+---------+-----------+
|901|     Aldin|Matuszkiewicz|  Male|East London|Operator|$41468.83|-32.954933|27.931913|East London|
+---+----------+-------------+------+-----------+--------+---------+----------+---------+-----------+



In [23]:
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [24]:
df2_filter_ids = df2.filter((df2.id>10) & (df2.id<100))
df2_filter_ids.show(100)

+---+----------+-----------+------+--------------------+--------------------+---------+----------+----------+
| id|first_name|  last_name|gender|                city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+-----------+------+--------------------+--------------------+---------+----------+----------+
| 11|    Kylynn|    Lockart|Female|            El Cardo|Nuclear Power Eng...|$13604.63|     -5.85| -79.88333|
| 12|       Rey|     Meharg|Female|         Wangqingtuo|Systems Administr...|$73423.70|  39.17238| 116.93161|
| 13|      Kerr|     Braden|  Male|           Sułkowice|Compensation Analyst|$33432.99|  49.81518| 19.377174|
| 14|    Mickie|  Whanstall|  Male|         Springfield|Assistant Media P...|$50838.53|  42.10148|-72.576675|
| 15|    Kaspar|      Pally|  Male|              Chrást|  Analyst Programmer|$40163.03|  49.79233| 13.491532|
| 16|    Norbie|     Gwyllt|  Male|              Xijiao|              Editor|$32492.73| 43.494576|  5.897802|
| 17|    C

In [25]:
# df2.registerTempTable('original')
df2.createOrReplaceTempView('original')

In [28]:
df_sql = spark.sql('select * from original')
# df_sql = spark.sql('select * from original where cast(replace(salary,"$",''), float) > 10000')
df_sql.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [39]:
df_sql_filter = spark.sql('select  concat(first_name," ", last_name) as FullName,* from original where gender =  "Female"')
df_sql_filter.show()

+-------------------+---+----------+----------+------+----------------+--------------------+---------+----------+----------+
|           FullName| id|first_name| last_name|gender|            city|            JobTitle|   salary|  Latitude| Longitude|
+-------------------+---+----------+----------+------+----------------+--------------------+---------+----------+----------+
|  Melinde Shilburne|  1|   Melinde| Shilburne|Female|       Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|Kimberly Von Welden|  2|  Kimberly|Von Welden|Female|          Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|    Alvera Di Boldi|  3|    Alvera|  Di Boldi|Female|            NULL|                NULL|$57576.52| 39.994747|116.339775|
|         Maris Folk|  6|     Maris|      Folk|Female| Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|       Masha Divers|  7|     Masha|    Divers|Female|          Dachun|                NULL|$25090.87| 24.879416|118.930115|


In [47]:
from pyspark.sql.functions import *
df2 = df2.withColumn('Clean_Salary', df2.salary.substr(2,100).cast('float'))
# .filter(df2.Clean_Salary, when(df2.Clean_Salary > 7000, df2.Clean_Salary.otherwise(df2.Clean_Salary+3000)))
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|Clean_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|    57576.52|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil En

In [49]:
df2.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+-----------------+
|summary|               id|first_name|last_name|gender|               city|           JobTitle|   salary|          Latitude|        Longitude|     Clean_Salary|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|               999|             1000|             1000|
|   mean|            500.5|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL| 25.43151724702484|43.33756460386515|55487.95562890625|
| stddev|288.8194360957494|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL|24.579082550156635| 69.4206453674681|25855.22985831031|
|    min|                1|   Abag

In [56]:
df2 = df2.withColumn('monthly_salary', (df2.Clean_Salary/12).cast('float'))
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+--------------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|Clean_Salary|monthly_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+--------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|      4786.515|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|      5237.217|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|    57576.52|     4798.0435|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|     5124.1025|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$6386

In [65]:
df2 = df2.withColumn('are_they_female', when(df2.gender == 'female', 'yes').otherwise('no'))
# df2 = df2.withColumn('are_they_female', when((upper(df2.gender)) == 'FEMALE'), 'yes')).otherwise('no'))
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+--------------+---------------+
| id|first_name| last_name|gender|           city|            JobTitle|   salary|  Latitude| Longitude|Clean_Salary|monthly_salary|are_they_female|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+--------------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|      4786.515|             no|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|      5237.217|             no|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|    57576.52|     4798.0435|             no|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489

In [66]:
import pyspark.sql.functions as sql_func

In [72]:
# df3 = df2.groupBy('gender').agg(sql_func.sum('monthly_salary').alias('total_monthly_salary_by_gender').cast('float'))
df3 = df2.groupBy('gender').agg(sql_func.sum('monthly_salary').cast('float').alias('total_monthly_salary_by_gender'))
df3.show()

+------+------------------------------+
|gender|total_monthly_salary_by_gender|
+------+------------------------------+
|Female|                     2280376.8|
|  Male|                     2343619.8|
+------+------------------------------+



In [84]:
df3 = df2.groupBy('gender','city').agg(sql_func.sum('clean_salary').alias('total_salary'),
                                       sql_func.avg('clean_salary').alias('avg_salary'),
                                       sql_func.max('clean_salary').alias('max_salary'),
                                       sql_func.min('clean_salary').alias('min_salary'),
                                       sql_func.count('clean_salary').alias('max_count_emp')

                                       )
# df3.show()
df3.sort(desc('max_count_emp')).show()

+------+------------+----------------+-----------------+----------+----------+-------------+
|gender|        city|    total_salary|       avg_salary|max_salary|min_salary|max_count_emp|
+------+------------+----------------+-----------------+----------+----------+-------------+
|  Male|     Cuogang|202341.990234375|50585.49755859375|  82505.36|  11832.43|            4|
|Female| Trollhättan|106623.369140625| 53311.6845703125|   79792.9|  26830.47|            2|
|  Male|Kuala Lumpur|  121664.8828125|   60832.44140625|   79030.6|  42634.28|            2|
|  Male|      Melaka| 135305.95703125|  67652.978515625|  94743.26|   40562.7|            2|
|  Male|      Xijiao|46200.7607421875|23100.38037109375|  32492.73|  13708.03|            2|
|  Male| Tanjungluar|  38955.69921875|  19477.849609375|  22089.29|  16866.41|            2|
|Female|   Mieścisko| 70627.009765625| 35313.5048828125|  51862.48|  18764.53|            2|
|Female|       Copán|      110448.125|       55224.0625|  70378.23|   

Writing files to destination formats like CSV, TXT, PARQUET, AVRO

In [91]:
df3.write.mode('overwrite').option("header",True).csv('/content/drive/MyDrive/Colab Notebooks/resources/output/employees_by_gender_city.csv')

In [92]:
df3.write.mode('overwrite').option("header", True).json('/content/drive/MyDrive/Colab Notebooks/resources/output/employees_by_gender_city.json')

In [93]:
df3.write.mode('overwrite').option("header",True).parquet('/content/drive/MyDrive/Colab Notebooks/resources/output/employees_by_gender_city.parquet')

We cant write into AVRO format directly from pyspark library

In [96]:
# import com.databricks.spark.avro._
# import org.apache.spark.sql.SparkSession
# df3.write.mode('overwrite').option("header",True).avro('/content/drive/MyDrive/Colab Notebooks/resources/output/employees_by_gender_city.avro')

Read a new csv file


In [104]:
df =  spark.read.csv('/content/drive/MyDrive/Colab Notebooks/resources/input/challenge.csv', header=True)
df.show()

+---------------+--------------+-----------------+----------+
|     ip_address|       Country|      Domain Name|Bytes_used|
+---------------+--------------+-----------------+----------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|
| 119.239.207.13|         China|         youtu.be|        51|
|  68.69.217.210|         China|        adobe.com|        10|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|
|   211.13.10.68|     Indonesia|          hud.gov|        29|
|   239.80.21.97|      Suriname|       smh.com.au|       218|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|
| 127.242.24.138|         China| surveymonkey.com|       123|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|
|   237.54.11.63|         China|       amazon.com|        83|
| 252.141.157.25|         Japan|      cornell.edu|       374|
|185.220.128.248|       Belgium|       weebly.com|       389|
|   151.77.19.45|   Afghanistan|independent.co.uk|       282|
|  9.161

In [105]:
df = df.withColumn('isMexico', when(df.Country == 'Mexico', 'yes').otherwise('no'))
df.show()

+---------------+--------------+-----------------+----------+--------+
|     ip_address|       Country|      Domain Name|Bytes_used|isMexico|
+---------------+--------------+-----------------+----------+--------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|      no|
| 119.239.207.13|         China|         youtu.be|        51|      no|
|  68.69.217.210|         China|        adobe.com|        10|      no|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|      no|
|   211.13.10.68|     Indonesia|          hud.gov|        29|      no|
|   239.80.21.97|      Suriname|       smh.com.au|       218|      no|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|      no|
| 127.242.24.138|         China| surveymonkey.com|       123|      no|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|      no|
|   237.54.11.63|         China|       amazon.com|        83|      no|
| 252.141.157.25|         Japan|      cornell.edu|       374|      no|
|185.2

In [106]:
df_isMexico = df.groupBy('isMexico').agg(sql_func.sum('bytes_used').alias('total_bytes_used'))
df_isMexico.show()

+--------+----------------+
|isMexico|total_bytes_used|
+--------+----------------+
|      no|        508076.0|
|     yes|          6293.0|
+--------+----------------+



In [109]:
df_country_ip = df.groupBy('Country').agg(sql_func.countDistinct('ip_address').alias('count_distinct_ips'))
df_country_ip.sort(desc('count_distinct_ips')).show()

+--------------+------------------+
|       Country|count_distinct_ips|
+--------------+------------------+
|         China|               172|
|     Indonesia|               114|
|   Philippines|                65|
|        Russia|                56|
|        Brazil|                35|
|        Poland|                31|
|        Sweden|                28|
|         Japan|                25|
|Czech Republic|                23|
|      Portugal|                23|
|        France|                21|
|          Peru|                19|
|      Colombia|                17|
| United States|                15|
|     Argentina|                14|
|       Ukraine|                14|
|        Mexico|                13|
|      Thailand|                12|
|       Nigeria|                11|
|        Canada|                11|
+--------------+------------------+
only showing top 20 rows



In [111]:
df.write.mode('overwrite').option("header",True).csv('/content/drive/MyDrive/Colab Notebooks/resources/output/challenge.csv')
df_country_ip.sort(desc('count_distinct_ips')).write.mode('overwrite').option("header",True).csv('/content/drive/MyDrive/Colab Notebooks/resources/output/challenge_country_ip.csv')
#