<a href="https://colab.research.google.com/github/khaledn66/pyspark2/blob/main/_aggregating_dataframes_code_along_checkpoint_ipynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aggregating DataFrames in PySpark

In this lecture we will be going over how to aggregate dataframes in Pyspark. The commands we will learn here will be super useful for doing quality checks on your dataframes and answering more simiplistic business questions with you data.

So let's get to it! Here is what we will cover today:

 - GroupBy
 - Pivot
 - Aggregate methods
 - Combos of each

In [None]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

# Новый раздел

In [None]:
spark

In [None]:
import os
os.listdir("/content/drive/My Drive/content")

['nyc_air_bnb.csv']

In [None]:
import os

# Get the current working directory
current_directory = os.getcwd()
print(current_directory)
from google.colab import drive
drive.mount('/content/drive',force_remount=True)
file_path = "/content/drive/My Drive/'nyc_air_bnb.csv'"
airbnb = spark.read.csv(file_path, inferSchema=True, header=True)

/content
Mounted at /content/drive


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/drive/My Drive/'nyc_air_bnb.csv'.

In [None]:
from google.colab import files

# Upload files
uploaded = files.upload()

Saving students.csv to students.csv


In [None]:
import pandas as pd

# Read the CSV using pandas
df = pd.read_csv("/content/drive/My Drive/content/nyc_air_bnb.csv")
print(df.head())  # Display the first few rows of the DataFrame

     id                                              name  host_id  \
0  2539                Clean & quiet apt home by the park     2787   
1  2595                             Skylit Midtown Castle     2845   
2  3647               THE VILLAGE OF HARLEM....NEW YORK !     4632   
3  3831                   Cozy Entire Floor of Brownstone     4869   
4  5022  Entire Apt: Spacious Studio/Loft by central park     7192   

     host_name neighbourhood_group neighbourhood  latitude  longitude  \
0         John            Brooklyn    Kensington  40.64749  -73.97237   
1     Jennifer           Manhattan       Midtown  40.75362  -73.98377   
2    Elisabeth           Manhattan        Harlem  40.80902  -73.94190   
3  LisaRoxanne            Brooklyn  Clinton Hill  40.68514  -73.95976   
4        Laura           Manhattan   East Harlem  40.79851  -73.94399   

         room_type  price  minimum_nights  number_of_reviews last_review  \
0     Private room    149               1                  9  20

In [None]:
# Now read the new CSV file into PySpark
# Save the DataFrame to a new CSV file
df.to_csv("/content/nyc_air_bnb_copy.csv", index=False)

airbnb = spark.read.csv("/content/nyc_air_bnb_copy.csv", inferSchema=True, header=True)


In [None]:
airbnb.show()

+----+--------------------+-------+----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|       host_name|neighbourhood_group|     neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|            John|           Brooklyn|        Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|           365.0|
|2595|Skylit Midtown Ca.

In [None]:
airbnb.limit(4).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365.0
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355.0
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365.0
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194.0


In [None]:
airbnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)



In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: double (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365.0
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355.0
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365.0
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194.0
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0.0


In [None]:
df.groupBy("neighbourhood_group").count().show(7)

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|    Jackson Heights|    2|
|     Hell's Kitchen|    7|
|  Greenwich Village|    2|
+-------------------+-----+
only showing top 7 rows



In [None]:
df.groupBy("neighbourhood_group").min("price").show(7)

+-------------------+----------+
|neighbourhood_group|min(price)|
+-------------------+----------+
|         Douglaston|         1|
|             Queens|        10|
|              Nadia|      null|
|            Midtown|         2|
|    Jackson Heights|         2|
|     Hell's Kitchen|         1|
|  Greenwich Village|        31|
+-------------------+----------+
only showing top 7 rows



In [None]:
df.groupBy("neighbourhood").agg({'price':'mean'}).show(5)

+-------------+----------+
|neighbourhood|avg(price)|
+-------------+----------+
|       Corona| 59.171875|
| Richmondtown|      78.0|
| Prince's Bay|     409.5|
|  Westerleigh|      71.5|
|   Mill Basin|    179.75|
+-------------+----------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import *

In [None]:
df.groupBy("neighbourhood").agg(min(df.price).alias("Min"),max(df.price)).show()

+-----------------+---+----------+
|    neighbourhood|Min|max(price)|
+-----------------+---+----------+
|           Corona| 23|       359|
|     Richmondtown| 78|        78|
|     Prince's Bay| 85|      1250|
|      Westerleigh| 40|       103|
|       Mill Basin| 85|       299|
|         40.76199|  1|         1|
|     Civic Center| 50|       950|
|         40.83166|  1|         1|
|       Douglaston| 40|       178|
|       Mount Hope| 24|       250|
|          40.7578|  1|         1|
|         40.80958|  1|         1|
|      Marble Hill| 40|       274|
|        Rego Park| 21|       300|
|         40.81225|  2|         2|
|         40.76805|  1|         1|
|         40.64936|  1|         1|
|    Dyker Heights| 30|       170|
|         40.76364|  2|         2|
|Kew Gardens Hills| 40|       399|
+-----------------+---+----------+
only showing top 20 rows



In [None]:
summary = df.summary("count","min","25%","75%","max")
summary.limit(4).toPandas()

Unnamed: 0,summary,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,count,49079,49047,48894,48873,48894.0,48894.0,48894.0,48894.0,48894.0,48887,48891,48738,38845.0,38858,48891,48737
1,min,"12 mins Manhattan""",1 Bed Apt in Utopic Williamsburg,"Heart of Greenwich Village""","very clean studio app""",194716858.0,2.0,-73.72247,-73.71299,-73.90783,-74,0,0,-73.94134,0,0,0
2,25%,9470528.0,2.4544724E7,7797690.0,475.0,194716858.0,40.68771,40.68981,-73.98309,56.0,69,1,1,0.76,0,1,0
3,75%,2.9151631E7,1.74786681E8,1.07434423E8,3.3499491E7,197400421.0,40.78304,40.76299,-73.93638,145.0,175,5,23,3.24,2,2,226


In [None]:
limit_summary = df.select("price","minimum_nights","number_of_reviews").summary("count","min","25%","75%","max")
limit_summary.limit(6).toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48887,48891,48738
1,min,-74,0,0
2,25%,69,1,1
3,75%,175,5,23
4,max,10000,1250,629


In [None]:
df.agg(min(df.price).alias("Min"),max(df.price)).show()

+---+----------+
|Min|max(price)|
+---+----------+
|-74|     10000|
+---+----------+



In [None]:
df.groupBy("neighbourhood").show()

AttributeError: 'GroupedData' object has no attribute 'show'

In [None]:
df.select(countDistinct("neighbourhood_group"),avg('price'),stddev('price')).toPandas()

Unnamed: 0,count(DISTINCT neighbourhood_group),avg(price),stddev_samp(price)
0,77,152.222984,238.541467


In [None]:
df.filter("room_type='Shared room'").groupBy("room_type").pivot("neighbourhood_group",["Queens","Brooklyn"]).count().show(10)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



In [None]:
df.filter("room_type='Shared room'").groupBy("room_type").pivot("neighbourhood_group",["Queens","Brooklyn"]).agg(min(df.price),max(df.price)).toPandas()

Unnamed: 0,room_type,Queens_min(price),Queens_max(price),Brooklyn_min(price),Brooklyn_max(price)
0,Shared room,11,1800,0,725


In [None]:
from google.colab import files

# Upload files
uploaded = files.upload()

Saving ._Aggregating_DataFrames_in_PySpark_HW.py to ._Aggregating_DataFrames_in_PySpark_HW.py


In [None]:
import os
os.listdir("/content/drive/My Drive/content")

['nyc_air_bnb.csv']

In [None]:
import os
print(os.listdir("/content"))

['.config', 'nyc_air_bnb_copy.csv', 'fifa19.csv', '._Aggregating_DataFrames_in_PySpark_HW.py', 'drive', 'students.csv', 'sample_data']


In [None]:
# Replace 'your_file.py' with the actual filename
!python /content/._Aggregating_DataFrames_in_PySpark_HW.py


In [None]:
# Replace 'your_file.py' with the actual filename
with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'r',encoding='latin1') as file:
 content = file.read()
 print(content)

    Mac OS X            	   2   î                                            ATTR          ´   l                  ´   H  com.apple.macl      ü   $  com.apple.quarantine  Õº~öBOº"|âí                                                      q/0082;5fe1e641;iZip\x20Unarchiver; 


In [None]:
!pip install jupytext


Collecting jupytext
  Downloading jupytext-1.16.4-py3-none-any.whl.metadata (13 kB)
Downloading jupytext-1.16.4-py3-none-any.whl (153 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.5/153.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jupytext
Successfully installed jupytext-1.16.4


In [None]:
!jupytext --to notebook /content/._Aggregating_DataFrames_in_PySpark_HW.py


[jupytext] Reading /content/._Aggregating_DataFrames_in_PySpark_HW.py in format py
Traceback (most recent call last):
  File "/usr/local/bin/jupytext", line 8, in <module>
    sys.exit(jupytext())
  File "/usr/local/lib/python3.10/dist-packages/jupytext/cli.py", line 497, in jupytext
    exit_code += jupytext_single_file(nb_file, args, log)
  File "/usr/local/lib/python3.10/dist-packages/jupytext/cli.py", line 561, in jupytext_single_file
    notebook = read(nb_file, fmt=fmt, config=config)
  File "/usr/local/lib/python3.10/dist-packages/jupytext/jupytext.py", line 432, in read
    return read(stream, as_version=as_version, fmt=fmt, config=config, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/jupytext/jupytext.py", line 441, in read
    return reads(fp.read(), fmt, config=config, **kwargs)
  File "/usr/lib/python3.10/codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xee

In [None]:
import nbformat
from nbformat.v4 import new_notebook, new_code_cell

# Try reading with a different encoding
with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'r', encoding='latin1') as file:
    code = file.read()

# Create the notebook
nb = new_notebook(cells=[new_code_cell(code)])

# Save it as a .ipynb file
with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'w') as f:
    nbformat.write(nb, f)


In [None]:
import os
print(os.listdir('/content'))


['.config', 'nyc_air_bnb_copy.csv', 'fifa19.csv', '._Aggregating_DataFrames_in_PySpark_HW.py', 'drive', 'students.csv', 'sample_data']


In [None]:
import nbformat
from nbformat.v4 import new_notebook, new_code_cell

# Attempt to read the Python file with a specific encoding
try:
    with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'r', encoding='latin1') as file:
        code = file.read()
    print("File read successfully.")

    # Create a notebook and add code
    nb = new_notebook(cells=[new_code_cell(code)])

    # Save the notebook
    with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'w') as f:
        nbformat.write(nb, f)
    print("Notebook saved successfully as 'your_file.ipynb'.")

except Exception as e:
    print(f"An error occurred: {e}")


File read successfully.
Notebook saved successfully as 'your_file.ipynb'.


In [None]:
import os
print(os.listdir('/content'))


['.config', 'nyc_air_bnb_copy.csv', 'fifa19.csv', '._Aggregating_DataFrames_in_PySpark_HW.py', 'drive', 'students.csv', 'sample_data']


In [None]:
!ls -ld /content


drwxr-xr-x 1 root root 4096 Oct 27 19:17 /content


In [None]:
import nbformat
from nbformat.v4 import new_notebook, new_code_cell

# Specify exact path
output_path = '/content/._Aggregating_DataFrames_in_PySpark_HW.ipynb'

# Read and save
with open('/content/._Aggregating_DataFrames_in_PySpark_HW.py', 'r', encoding='latin1') as file:
    code = file.read()

nb = new_notebook(cells=[new_code_cell(code)])
with open(output_path, 'w') as f:
    nbformat.write(nb, f)

# Check if the file now exists
print("Files in /content:")
print(os.listdir("/content"))


Files in /content:
['.config', 'nyc_air_bnb_copy.csv', 'fifa19.csv', '._Aggregating_DataFrames_in_PySpark_HW.py', 'drive', '._Aggregating_DataFrames_in_PySpark_HW.ipynb', 'students.csv', 'sample_data']


In [None]:
ls -la /content

total 15952
drwxr-xr-x 1 root root    4096 Oct 27 20:15 [0m[01;34m.[0m/
drwxr-xr-x 1 root root    4096 Oct 27 19:29 [01;34m..[0m/
-rw-r--r-- 1 root root    8232 Oct 27 20:15 ._Aggregating_DataFrames_in_PySpark_HW.ipynb
-rw-r--r-- 1 root root    4879 Oct 27 20:06 ._Aggregating_DataFrames_in_PySpark_HW.py
drwxr-xr-x 4 root root    4096 Oct 24 13:20 [01;34m.config[0m/
drwx------ 6 root root    4096 Oct 27 16:06 [01;34mdrive[0m/
-rw-r--r-- 1 root root 9140113 Oct 27 16:23 fifa19.csv
-rw-r--r-- 1 root root 7077687 Oct 27 16:12 nyc_air_bnb_copy.csv
drwxr-xr-x 1 root root    4096 Oct 24 13:20 [01;34msample_data[0m/
-rw-r--r-- 1 root root   72036 Oct 27 16:23 students.csv


In [None]:
!mv /content/._Aggregating_DataFrames_in_PySpark_HW.ipynb /content/Aggregating_DataFrames_in_PySpark_HW.ipynb
!mv /content/._Aggregating_DataFrames_in_PySpark_HW.py /content/Aggregating_DataFrames_in


In [None]:
!ls -la /content/notebooks


ls: cannot access '/content/notebooks': No such file or directory
