PySpark làm quen với DataFrame

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
from datetime import datetime, date
from pyspark.sql import Row

df = spark.createDataFrame(
    [
      Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
      Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
      Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
    ]
)

In [None]:
df.show()

#Limit row show
df.show(2) # Show 2 row

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 2 rows



In [None]:
df.columns

df.select('a', 'b', 'c').describe().show()

+-------+------------------+------------------+-------+
|summary|                 a|                 b|      c|
+-------+------------------+------------------+-------+
|  count|                 3|                 3|      3|
|   mean|2.3333333333333335|3.3333333333333335|   NULL|
| stddev|1.5275252316519465|1.5275252316519465|   NULL|
|    min|                 1|               2.0|string1|
|    max|                 4|               5.0|string3|
+-------+------------------+------------------+-------+



In [None]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,4,5.0,string3,2000-03-01,2000-01-03 12:00:00


In [None]:
from pyspark.sql.functions import upper
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [None]:
df.select(df.b).show()

+---+
|  b|
+---+
|2.0|
|3.0|
|5.0|
+---+



PySpark với function

In [None]:
def pandas_filter_func(interator):
  for pandas_df in interator:
    yield pandas_df[pandas_df.a == 4]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Group Data

In [None]:
df1 = spark.createDataFrame(
    [
      ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
      ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
      ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]
    ], schema=['color', 'fruit', 'v1', 'v2']
)
df1.toPandas()

Unnamed: 0,color,fruit,v1,v2
0,red,banana,1,10
1,blue,banana,2,20
2,red,carrot,3,30
3,blue,grape,4,40
4,red,carrot,5,50
5,black,carrot,6,60
6,red,banana,7,70
7,red,grape,8,80


In [31]:
df1.groupby('fruit').sum().show()

+------+-------+-------+
| fruit|sum(v1)|sum(v2)|
+------+-------+-------+
| grape|     12|    120|
|banana|     10|    100|
|carrot|     14|    140|
+------+-------+-------+



In [None]:
def plus_mean(pandas_df):
    print(pandas_df.v1.mean())
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean(), v2=pandas_df.v2 - pandas_df.v2.mean())
df1.groupBy('fruit').applyInPandas(plus_mean, schema=df1.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -2| 10|
| blue|banana| -1| 20|
|  red|banana|  3| 70|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|black|carrot|  1| 60|
| blue| grape| -2| 40|
|  red| grape|  2| 80|
+-----+------+---+---+



Practice with sample CSV File

In [36]:
zip_code_df = spark.read.option('header', 'true').csv('zipcodes.csv', inferSchema=True)
zip_code_df.show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL|         NULL|
|           2|    70

In [37]:
zip_code_df.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: integer (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



In [40]:
zip_code_df.select('RecordNumber','ZipCode','State').show()

+------------+-------+-----+
|RecordNumber|ZipCode|State|
+------------+-------+-----+
|           1|    704|   PR|
|           2|    704|   PR|
|          10|    709|   PR|
|       61391|  76166|   TX|
|       61392|  76177|   TX|
|       61393|  76177|   TX|
|           4|    704|   PR|
|       39827|  85209|   AZ|
|       39828|  85210|   AZ|
|       49345|  32046|   FL|
|       49346|  34445|   FL|
|       49347|  32564|   FL|
|       49348|  34487|   FL|
|          10|    708|   PR|
|           3|    704|   PR|
|       54354|  36275|   AL|
|       54355|  35146|   AL|
|       54356|  35585|   AL|
|       76511|  27007|   NC|
|       76512|  27203|   NC|
+------------+-------+-----+
only showing top 20 rows



In [43]:
zip_code_df.describe().show()

+-------+-----------------+-----------------+-----------+---------------+-----+------------+-----------------+------------------+-------------------+-------------------+-------------------+-----------+-------+-------------------+--------------------+-----------------+-------------------+--------------------+-------------+
|summary|     RecordNumber|          Zipcode|ZipCodeType|           City|State|LocationType|              Lat|              Long|              Xaxis|              Yaxis|              Zaxis|WorldRegion|Country|       LocationText|            Location|  TaxReturnsFiled|EstimatedPopulation|          TotalWages|        Notes|
+-------+-----------------+-----------------+-----------+---------------+-----+------------+-----------------+------------------+-------------------+-------------------+-------------------+-----------+-------+-------------------+--------------------+-----------------+-------------------+--------------------+-------------+
|  count|               21| 

In [53]:
from pyspark.sql.functions import concat, lit
## Add new column
zip_code_df.withColumn('State(ST)',  concat(lit('ST-'),zip_code_df['State'])).show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+---------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|State(ST)|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+---------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL|     

In [55]:
# Drop column
zip_code_df.drop('State(ST)').show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL|         NULL|
|           2|    70

In [58]:
# Rename column

zip_code_df.withColumnRenamed('ZipCode', 'Zip').show()

+------------+-----+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|  Zip|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-----+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|  704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           NULL|               NULL|      NULL|         NULL|
|           2|  704|   STAND

In [69]:
# Delete NULL Row
clean_df = zip_code_df.na.drop(how='all',subset=['TaxReturnsFiled', 'EstimatedPopulation','TotalWages','Notes'])
clean_df.show()

+------------+-------+-----------+-----------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+---------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|       City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|   LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-----------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+---------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|       61392|  76177|   STANDARD| FORT WORTH|   TX|       PRIMARY|32.75| -97.33| -0.1|-0.83| 0.54|         NA|     US| Fort Worth, TX| NA-US-TX-FORT WORTH|        false|           2126|               4053| 122396986|         NULL|
|       61393|  76177|   STANDARD|   FT WORTH|   TX|    ACCEPTABLE|32.75

In [105]:
from pyspark.sql.functions import sum as _sum, col, round, avg, regexp_replace
# Top 3 thanh pho co tien luong cao nhat
clean_df.groupBy('City').agg(_sum('TotalWages').alias('Total Wages')).orderBy('Total Wages', ascending=False).limit(3).show()

+-----------+-----------+
|       City|Total Wages|
+-----------+-----------+
|       MESA| 1034793195|
|   ASHEBORO|  245796791|
|SPRINGVILLE|  172127599|
+-----------+-----------+



In [101]:
# % Dan so da nop to khai thue tren tong dan so uoc tinh
avg_tax_return = clean_df.groupBy('City').agg(_sum('EstimatedPopulation').alias('TotalPopulation'), _sum('TaxReturnsFiled').alias('TotalTaxFiledReturn')).withColumn('TaxFilledPercent', round((col('TotalTaxFiledReturn') / col('TotalPopulation')) * 100, 2)).withColumn('TaxFilledPercent', concat(col('TaxFilledPercent'),lit(' %'))).orderBy('TaxFilledPercent', ascending=False)

avg_tax_return.toPandas()



Unnamed: 0,City,TotalPopulation,TotalTaxFiledReturn,TaxFilledPercent
0,MESA,52329,29336,56.06 %
1,HOLT,2190,1207,55.11 %
2,ASHEBORO,17044,9390,55.09 %
3,HILLIARD,7443,3922,52.69 %
4,FT WORTH,4053,2126,52.45 %
5,FORT WORTH,4053,2126,52.45 %
6,SPRINGVILLE,7845,4046,51.57 %
7,ASH HILL,1666,842,50.54 %
8,SPRUCE PINE,1209,610,50.45 %


In [112]:
format_df = avg_tax_return.withColumn('TaxFilledPercent', regexp_replace('TaxFilledPercent', '%', '').cast('float')).select(round(avg('TaxFilledPercent'),2).alias('AverageTaxFilledPercent'))
format_df.toPandas()

Unnamed: 0,AverageTaxFilledPercent
0,52.93
