-
Notifications
You must be signed in to change notification settings - Fork 0
2. Getting Started with Spark
This section shows a first example of Spark
Read CSV Data:
Lazy operation: CSV has been converted to a DataFrame and then being converted into a local array or list of rows.
flightData2015 = df=spark.read.format("csv").option("header","true").load("C:/Users/renau/OneDrive/02-Data Projects/09-Apache-Spark/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")
flightData2015.take(5)
We can now call the explain plan which explain us about the stucture:
flightData2015.sort("count").explain()
>>> FileScan csv [DEST_COUNTRY_NAME#38,ORIGIN_COUNTRY_NAME#39,count#40] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/renau/OneDrive/02-Data Projects/09-Apache-Spark/Spark-The-Defini..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:string>
By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set this value to 5 to reduce the number of the output We do not manipulate the physical data; we configure physical execution characteristics Spark’s programming model—functional programming put the same inputs always result in the same outputs when the transformations on that data stay constant.
partitions from the shuffle:
>>> spark.conf.set("spark.sql.shuffle.partitions", "5")
>>> flightData2015.sort("count").take(2)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count='1'), Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count='1')]
Converting dataframe into a table or view:
flightData2015.createOrReplaceTempView("flight_data_2015")
Query in SQL:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()
sqlWay.explain()
dataFrameWay.explain()
Import SQL functions and Basics Manipulations
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)
>>>[Row(max(count)='986')]
Multi-transformation Query : Get top 5 Destinations in SQL Aggregation Syntax
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United States| 411352.0|
| Canada| 8399.0|
| Mexico| 7140.0|
| United Kingdom| 2025.0|
| Japan| 1548.0|
+-----------------+-----------------+
Multi-transformation Query : Get top 5 Destinations in Python Dataframe Aggregation Syntax