-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathI Getting to know PySpark.py
152 lines (121 loc) · 4.3 KB
/
I Getting to know PySpark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
how Spark manages data and how can you read and write tables from Python.
\ What is Spark, anyway? /
| parallel computation | ( split data in clusters )
platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes
Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.
parallel computation can make certain types of programming tasks much faster.
| Using Spark in Python |
- cluster: <remote machine connects all nodes>
- master : ( main pc ) <manages splitting data and computations>
- worker : ( rest of computers in cluster ) <receives calculations, return results>
. create connection : <SparkContext> class <sc> #creating an instance of the
. attributes : <SparkConf() constructor>
| create SparkSession |
# Import SparkSession
> from pyspark.sql import SparkSession
# create SparkSession builder
> my_spark = SparkSession.builder.getOrCreate()
# print spark tables
> print(spark.catalog.listTables())
| SparkSession attributes |
# always <SparkSessionName>.
- catalog: ( extract and view table data )
- .listTables()
# returns column names in cluster as list
> spark.catalog.listTables()
- .read() # read different data sources into Spark DataFrames
> spark.read.csv(file_path, header=True)
| SparkSession methods |
# always <SparkSessionName>.
- .show() -> print
- .sql() -> run a query ( <takes> queried 'string' <returns> DataFrame results )
- .toPandas() -> returns corresponding 'pandas' DataFrame
- .createDataFrame() -> <takes> pandas DataFrame and <returns> Spark DataFrame. (temp)
- .createTempView() -> add to the catalog, but as temporary (in session)
- .createOrReplaceTempView("temp") -> creates temp table if didn't already or updates
"""
#|
#|
### How do you connect to a Spark cluster from PySpark?
# ANSW: Create an instance of the SparkContext class.
#|
#|
### Examining The SparkContext
# Verify SparkContext in environment
print(sc)
# Print Spark version
print(sc.version)
#- <SparkContext master=local[*] appName=pyspark-shell>
#- 3.2.0
#|
#|
"""
\ Using Spark DataFrames /
. Spark structure : Resilient Distributed Dataset (RDD)
. behaves : like a SQL table
# DataFrames are more optimized for complicated operations than RDDs.
- create Spark Dataframe: create a object from your
<SparkSession> #interface <'spark'>
<SparkContext> #connection <'sc'>
"""
#|
#|
### Which of the following is an advantage of Spark DataFrames over RDDs?
# ANSW: Operations using DataFrames are automatically optimized.
#|
#|
### Creating a SparkSession
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
# Create my_spark
my_spark = SparkSession.builder.getOrCreate()
# Print my_spark
print(my_spark)
#|
#|
### Viewing tables
# Print the tables in the catalog
print(spark.catalog.listTables())
#|
#|
### Are you query-ious?
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"
# Get the first 10 rows of flights
flights10 = spark.sql(query)
# Show the results
flights10.show()
#|
#|
### Pandafy a Spark DataFrame
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
# Run the query
flight_counts = spark.sql(query)
# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()
# Print the head of pd_counts
print(pd_counts.head())
#|
#|
### Put some Spark in your data
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)
# Examine the tables in the catalog
print(spark.catalog.listTables())
# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")
# Examine the tables in the catalog again
print(spark_temp)
#|
#|
### Dropping the middle man
# Don't change this file path
file_path = "/usr/local/share/datasets/airports.csv"
# Read in the airports data
airports = spark.read.csv(file_path, header=True) # to view column names
# Show the data
airports.show()