Skip to content

Commit 9affb30

Browse files
authored
Update II. Manipulating data.py
1 parent a9fef7e commit 9affb30

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

Introduction to PySpark/II. Manipulating data.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@
1818
| > selectStr = flights.select("tailnum", "origin", "dest")
1919
| > flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
2020
- .withColumn() -> returns all columns in addition to the defined.
21+
-> create new df column
22+
- .withColumnRenamed
23+
-> rename columns
24+
25+
- GroupedData
26+
|
27+
- .agg() -> pass an aggregate column expression that uses any of the aggregate functions from the pyspark.sql.functions submodule.
28+
# Import pyspark.sql.functions as F
29+
> import pyspark.sql.functions as F
30+
- F.stddev -> estandard deviation
31+
32+
- .join() -> takes three arguments. 1.the second DataFrame to join, 2. on == key column(s) as a string, 3. how == specifies kind of join how="leftouter"
2133
"""
2234
#|
2335
#|
@@ -84,3 +96,72 @@
8496
# Create the same table using a SQL expression
8597
speed2 = flights.selectExpr(
8698
"origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
99+
#|
100+
#|
101+
### Aggregating
102+
# Find the shortest flight from PDX in terms of distance
103+
# Perform the filtering by referencing the column directly, not passing a SQL string.
104+
flights.filter(flights.origin == "PDX").groupBy().min('distance').show()
105+
106+
# Find the longest flight from SEA in terms of air time
107+
flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()
108+
#|
109+
#|
110+
### Aggregating II
111+
# Average duration of Delta flights
112+
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()
113+
114+
# Total hours in the air
115+
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()
116+
#|
117+
#|
118+
### Grouping and Aggregating I
119+
# Group by tailnum
120+
by_plane = flights.groupBy("tailnum")
121+
122+
# Number of flights each plane made
123+
by_plane.count().show()
124+
125+
# Group by origin
126+
by_origin = flights.groupBy("origin")
127+
128+
# Average duration of flights from PDX and SEA
129+
by_origin.avg("air_time").show()
130+
#|
131+
#|
132+
### Grouping and Aggregating II
133+
# Import pyspark.sql.functions as F
134+
import pyspark.sql.functions as F
135+
136+
# Group by month and dest
137+
by_month_dest = flights.groupBy('month','dest')
138+
139+
# Average departure delay by month and destination
140+
by_month_dest.avg('dep_delay').show()
141+
142+
# Standard deviation of departure delay
143+
by_month_dest.agg(F.stddev('dep_delay')).show() # stddev = standard deviation
144+
#|
145+
#|
146+
### joining
147+
"""Which of the following is not true?
148+
149+
Joins combine tables.
150+
Joins add information to a table.
151+
Storing information in separate tables can reduce repetition.
152+
There is only one kind of join."""
153+
# ANSW: There is only one kind of join.
154+
#|
155+
#|
156+
### Joining II
157+
# Examine the data
158+
print(airports.show())
159+
160+
# Rename the faa column
161+
airports = airports.withColumnRenamed('faa', 'dest')
162+
163+
# Join the DataFrames
164+
flights_with_airports = flights.join(airports, on='dest', how='leftouter')
165+
166+
# Examine the new DataFrame
167+
print(flights_with_airports.show())

0 commit comments

Comments
 (0)