- pyspark-note
- Concept
- RDD Basic operation
- Deal with
JSON
data - Deal with
.parquet
- Spark Dataframe
- Create sparkdf by reading
.csv
- [ing] Speed Up Reading .csv/.json with schema
- Merge/Union Two DataFrames with Different Columns or Schema
- Rename Columns
.printSchema()
in df- Deal with
datetime
/timestamp
F.create_map()
indf.withColumn()
, create adict
column.groupBy().count()
groupBy().agg()
groupBy().collect_set()
/groupBy().collect_list()
- Combine array of map to single map,
groupBy().collect_list()
df.createOrReplaceTempView("sql_table")
, allows to run SQL queries once registerdf
as temporary tables- Window functions
.join()
/spark.sql()
dataframesdf1.union(df2)
concat 2 dataframes- UDF
df.withColumn
, user defined function melt()
- Wide to long- Find duplicates and non-duplicates
- Create sparkdf by reading
- Databricks
- Graph, edge, vertice, Graphframe
- Database Connection
- spark-install-macos
- Reference
The records/items/elemets are stored in RDD(s).
Each RDD composists of Partitions
; each Partition
contains equal number of items
/elements
.
Source: https://spark.apache.org/docs/latest/cluster-overview.html
Most Python code runs in driver (in our local PC), except for code passed to RDD transformations.
- Transformations run at executors (in workers),
- actions run at executors and driver.
RDD Programming Guide
==> https://spark.apache.org/docs/latest/rdd-programming-guide.html
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.
--- by RDD Programming Guide
Spark的map,mapPartitions,mapPartitionsWithIndex詳解
==> https://blog.csdn.net/QQ1131221088/article/details/104051087
Return a new distributed dataset formed by passing each element of the source through a function func.
rdd_2 = sc.parallelize(range(10), 4)
new_rdd_2 = rdd_2.map(lambda x: str(x))
print('> .map() =\n', new_rdd_2.glom().collect())
print()
#################################################
from pyspark import TaskContext
result = rdd_2.map(lambda x :x+TaskContext().partitionId())
print('> Original RDD, rdd_2.glom().collect() =\n', rdd_2.glom().collect())
print()
print('> .map() with TaskContext().partitionId() =\n', result.glom().collect())
Output:
> .map() =
[['0', '1'], ['2', '3', '4'], ['5', '6'], ['7', '8', '9']]
> Original RDD, rdd_2.glom().collect() =
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
> .map() with TaskContext().partitionId() =
[[0, 1], [3, 4, 5], [7, 8], [10, 11, 12]]
Similar to .map()
, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U>
when running on an RDD of type T.
==> Divide-and-Conquer
algorithm => master node divides
the RDD into partitions, and distributes the partitions to workers, workers apply the same function to its partition. Then master node gets back (i.e. conquer
) the processed resuts from all workers.
(1)
rdd_2 = sc.parallelize([1,2,3,4,5,'a','b','c','d','e'], 4)
def func(itemsIteratorInPartition):
# apply this `func` to each partition (=the whole partition) of the RDD
yield str(itemsIteratorInPartition)
rdd_func = rdd_2.mapPartitions(func)
print('rdd_2.mapPartitions(func) =\n', rdd_func.glom().collect())
print()
Output:
rdd_2.mapPartitions(func) =
[['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>'], ['<itertools.chain object at 0x7ff8094580d0>']]
(2)
def func_2(itemsIteratorInPartition):
# you loop through each item in each partition of the RDD
# = just apply this `func_2` to each item in each partition
for item in itemsIteratorInPartition:
yield str(item)
rdd_func_2 = rdd_2.mapPartitions(func_2)
print('rdd_2.mapPartitions(func_2) =\n', rdd_func_2.glom().collect())
Output:
rdd_2.mapPartitions(func_2) =
[['1', '2'], ['3', '4'], ['5', 'a'], ['b', 'c', 'd', 'e']]
Similar to mapPartitions
, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
when running on an RDD of type T.
By using mapParitionsWithIndex you could output new elements which have their partition in it, then when you reduce you will know which partition you are handling the elements from.
==> https://stackoverflow.com/questions/31281225/find-out-the-partition-no-id
rdd_3 = sc.parallelize(range(10), 4)
# mapPartitionsWithIndex
def func(partitionIndex, itemsIteratorInPartition):
# apply this `func` to each partition (=the whole partition) of the RDD
yield (partitionIndex, sum(itemsIteratorInPartition))
new_rdd_3 = rdd_3.mapPartitionsWithIndex(func)
# glom() flattens elements on the same partition
print('> rdd_3.glom().collect() =', rdd_3.glom().collect())
print('> new_rdd_3.glom().collect() =', new_rdd_3.glom().collect())
################################################################################
def func_2(partitionIndex, itemsIteratorInPartition):
# you loop through each item in each partition of the RDD
# = just apply this `func_2` to each item in each partition
for item in itemsIteratorInPartition:
yield str(item+partitionIndex)
new_2_rdd_3 = rdd_3.mapPartitionsWithIndex(func_2)
# glom() flattens elements on the same partition
print()
print('>> new_2_rdd_3.glom().collect() =', new_2_rdd_3.glom().collect())
Output:
> rdd_3.glom().collect() = [[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
> new_rdd_3.glom().collect() = [[(0, 1)], [(1, 9)], [(2, 11)], [(3, 24)]]
>> new_2_rdd_3.glom().collect() = [['0', '1'], ['3', '4', '5'], ['7', '8'], ['10', '11', '12']]
print(sc.version, '\n')
py_list = [str(x) for x in range(5)]
rdd = sc.parallelize(py_list)
# map
new_rdd = rdd.map(lambda item: item+'xx')
print('.map() =\n', new_rdd.collect())
print()
# flatmap
# same as .map(), but flatten the results before returns
# i.e. remove all `list of list`/`nested list`
new_rdd = rdd.flatMap(lambda item: item+'xx')
print('.flatmap() =\n', new_rdd.collect())
Output:
3.1.2
.map() =
['0xx', '1xx', '2xx', '3xx', '4xx']
.flatmap() =
['0', 'x', 'x', '1', 'x', 'x', '2', 'x', 'x', '3', 'x', 'x', '4', 'x', 'x']
==> See .foreach()
/ .map()
v.s. .foreach()
Below example shows there are 100 items/elements in this RDD, and this RDD is partitioned into 4 partitions (or items are grouped in 4 partitions).
Store python list [0,1,...,99]
as RDD in Spark. This dataset is not loaded in memory. It is merely a pointer to the Python py_list
.
# Stores python list `[0,1,...,99]` as RDD in Spark
## This dataset is not loaded in memory
## It is merely a pointer to the Python `py_list`
py_list = range(100)
rdd = sc.parallelize(py_list)
print(rdd)
Output:
PythonRDD[11] at RDD at PythonRDD.scala:53
Returns the number of items in this RDD
#.count()
# Shows the number of items in this RDD
print('rdd.count()=', rdd.count())
Output:
100
Returns all the items in this RDD as python list
#.collect()
# Returns all the items in this RDD as python list
print('rdd.collect()=', rdd.collect())
Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Returns the content of each partitions as nested list
/ list of list
# Returns the content of each partitions as `nested list` / `list of list`
rdd.glom().collect()
Output:
[
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24],
[25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49],
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74],
[75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
]
Returns number of partitions in this RDD
#.getNumPartitions()
# Gets number of partitions in this RDD
print('rdd.getNumPartitions()=', rdd.getNumPartitions())
Output:
4
Just executes inside function for each data element in the RDD, but return NOTHING.
Short answer
-
.map()
:- is for transforming one RDD into another, then return the transformed.
- Return a new RDD by applying a function to each element of this RDD.
-
.foreach()
: is for applying an operation/function on all elements of this RDD.Note: RDD = 1 collection of elements
==> is-there-a-difference-between-foreach-and-map
Long answer:
The important difference between them is that map
accumulates all of the results into a collection, whereas foreach
returns nothing. map
is usually used when you want to transform a collection of elements with a function, whereas foreach
simply executes an action for each element.
-
In short,
foreach
is for applying an operation on each element of a collection of elements, whereasmap
is for transforming one collection into another. -
foreach
works with a single collection of elements. This is the input collection. -
map
works with two collections of elements: the input collection and the output collection.
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd.reduce(lambda a, b: a + b) #Merge the rdd values
('a',7,'a',2,'b',2)
Transformed RDD is thrown away from memory after execution. If afterward transformations/actions need it, PySpark recompiles it.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
Image. RDD Without vs With .cache()
/ .persist()
P.S. Solution:
.cache()
/.persist()
the transformed RDD
A = sc.parallelize(range(1, 1000))
t = 100
B = A.filter(lambda x: x*x < t)
print('B.collect()=', B.collect()) # B.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
## Here: B finishes execution and is thrown away from memory
t = 200
C = B.filter(lambda x: x*x < t) # C needs B again, so recomputes B, but t=200 not =100
# So,
# B = A.filter(lambda x: x*x < 200)
# C = B.filter(lambda x: x*x < 200)
print('C.collect()=', C.collect()) # C.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
print('C.count()=', C.count()) # C.count()= 14
A = sc.parallelize(range(1, 1000))
t = 100
B = A.filter(lambda x: x*x < t)
print('B.collect()=', B.collect()) # B.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
# save this RDD in memory/disk
B.cache()
# B.persist()
## Here: B is in memory
t = 200
C = B.filter(lambda x: x*x < t) # C needs B again, memory stores B, NO need to recompute B
# So,
# B = previous B
# C = B.filter(lambda x: x*x < 200)
print('C.collect()=', C.collect()) # C.collect()= [1, 2, 3, 4, 5, 6, 7, 8, 9]
print('C.count()=', C.count()) # C.count()= 9
xxx
reference ==> https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34
- When you cache a DataFrame create a new variable for it
cachedDF = df.cache()
. This will allow you to bypass the problems that we were solving in our example, that sometimes it is not clear what is the analyzed plan and what was actually cached. Here whenever you callcachedDF.select(...)
it will leverage the cached data. - Unpersist the DataFrame after it is no longer needed using
cachedDF.unpersist()
. If the caching layer becomes full, Spark will start evicting the data from memory using the LRU (least recently used) strategy. So it is good practice to use unpersist to stay more in control about what should be evicted. Also, the more space you have in memory the more can Spark use for execution, for instance, for building hash maps and so on.
https://mycupoftea00.medium.com/understanding-closure-in-spark-af6f280eebf9
https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
Q: Why printed counter is 0?
Ans: Because
- each executor (i.e. worker node) just applies
increment_counter()
func on its own copy of counter. - Also,
.foreach()
returns nothing
counter = 0
rdd = sc.parallelize(range(10))
print('rdd.collect() =', rdd.collect())
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print('counter =', counter) # just print out `counter` from your driver program, not from Spark
Output:
rdd.collect() = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
counter = 0
Correct way to do the above operation:
- The
.sum()
action is executed in Spark executor(s) .sum()
returns the sum value
print('rdd.sum() =', rdd.sum())
Output:
rdd.sum() = 45
Use .accumulator()
can also solve the issue.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
a = rdd.foreach(g)
print(accum.value)
45
Update in transformations may be applied more than once if tasks or job stages are re-executed.
rdd = sc.parallelize(range(10))
accum = sc.accumulator(0)
def g(x):
global accum
accum += x
return x * x
a = rdd.map(g)
print(type(accum))
print(accum.value) # 0, because no action presents, `accum` is not immediately computed (= laziness/lazy execution)
# print(a.reduce(lambda x, y: x+y))
a.cache()
tmp = a.count()
print(accum.value) # 45
print(rdd.reduce(lambda x, y: x+y)) # 45
tmp = a.count()
print(accum.value) # 45
print(rdd.reduce(lambda x, y: x+y)) # 45
Output:
<class 'pyspark.accumulators.Accumulator'>
0 #why it is 0? because of "lazy execution", if no actions present, "accum" is not compiled immediately
45
45
45
45
https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
Note from lecture note: Suggestion: Avoid using accumulators whenever possible. Use reduce() instead.
** There are different formats in JSON
: single-line, multi-line
** Most useful ==> https://zhuanlan.zhihu.com/p/267353998
Another reference: https://sparkbyexamples.com/pyspark/pyspark-read-json-file-into-dataframe/
NEED MODIFY, SOMETHING WRONG
i.e. whole record is present in single line
Example, a .json
/ .json.gz
,
jsonFile1.json
(2 records exist)
[{"RecordNumber": 2, "Zipcode": 99999, "ZipCodeType": "STANDARD", "City": "99 PASEO COSTA DEL SUR", "State": "PR"},{"RecordNumber": 10, "Zipcode": 999999999, "ZipCodeType": "STANDARD", "City": "9 BDA SAN LUIS", "State": "PR"}]
jsonFile2.json
(2 records exist)
[{"RecordNumber": 99999, "Zipcode": 704, "ZipCodeType": "STANDARD", "City": "xxx PASEO COSTA DEL SUR", "State": "PR"},{"RecordNumber": 999999, "Zipcode": 709, "ZipCodeType": "STANDARD", "City": "xxxx BDA SAN LUIS", "State": "PR"}]
Code:
# use `multiline = true` to read multi line JSON file
jsonFiles = ['jsonFile1','jsonFile2']
jsonFiles = [f+'.json' for f in jsonFiles]
print(jsonFiles)
df = spark.read.option('multiline', 'true').json('jsonFile*.json')
df.cache()
df.printSchema()
df.toPandas()
Output
['jsonFile1.json', 'jsonFile2.json']
root
|-- City: string (nullable = true)
|-- RecordNumber: long (nullable = true)
|-- State: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- Zipcode: long (nullable = true)
+--------------------+------------+-----+-----------+---------+
| City|RecordNumber|State|ZipCodeType| Zipcode|
+--------------------+------------+-----+-----------+---------+
|xxx PASEO COSTA D...| 99999| PR| STANDARD| 704|
| xxxx BDA SAN LUIS| 999999| PR| STANDARD| 709|
|99 PASEO COSTA DE...| 2| PR| STANDARD| 99999|
| 9 BDA SAN LUIS| 10| PR| STANDARD|999999999|
+--------------------+------------+-----+-----------+---------+
https://sparkbyexamples.com/pyspark/pyspark-parse-json-from-string-column-text-file/
file1.txt
{"Zipcode":703,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
file2.txt
{"Zipcode":299999,"ZipCodeType":"292999STANDARD","City":"PARC PARQUE","State":"PR"}
{"Zipcode":2999,"ZipCodeType":"9999999STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}
```python
# (1) read json from text file
dfFromTxt=spark.read.text("file*.txt")
dfFromTxt.printSchema()
dfFromTxt.show(truncate=False)
```
Output
```shell
root
|-- value: string (nullable = true)
+------------------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------------------+
|{"Zipcode":299999,"ZipCodeType":"292999STANDARD","City":"PARC PARQUE","State":"PR"} |
|{"Zipcode":2999,"ZipCodeType":"9999999STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}|
|{"Zipcode":703,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"} |
|{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"} |
+------------------------------------------------------------------------------------------+
```
```python
# Originally
# json_df = spark.read.json(dfFromTxt.rdd.map(lambda row: row.value))
# Explain
list_of_string = dfFromTxt.rdd.map(lambda row: row.value)
display(list_of_string.collect())
json_df = spark.read.json(list_of_string)
json_df.printSchema()
json_df.show()
```
Output
```shell
['{"Zipcode":299999,"ZipCodeType":"292999STANDARD","City":"PARC PARQUE","State":"PR"}',
'{"Zipcode":2999,"ZipCodeType":"9999999STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}',
'{"Zipcode":703,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}',
'{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PASEO COSTA DEL SUR","State":"PR"}']
root
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- Zipcode: long (nullable = true)
+-------------------+-----+---------------+-------+
| City|State| ZipCodeType|Zipcode|
+-------------------+-----+---------------+-------+
| PARC PARQUE| PR| 292999STANDARD| 299999|
|PASEO COSTA DEL SUR| PR|9999999STANDARD| 2999|
| PARC PARQUE| PR| STANDARD| 703|
|PASEO COSTA DEL SUR| PR| STANDARD| 704|
+-------------------+-----+---------------+-------+
```
-
Define schema by Pyspark type
# (2) Create Schema of the JSON column from pyspark.sql.types import StructType,StructField, StringType schema = StructType([ StructField("Zipcode",StringType(),True), StructField("ZipCodeType",StringType(),True), StructField("City",StringType(),True), StructField("State", StringType(), True) ]) schema
Output
StructType(List(StructField(Zipcode,StringType,true),StructField(ZipCodeType,StringType,true),StructField(City,StringType,true),StructField(State,StringType,true)))
-
Convert json column to multiple columns
# (3) Convert json column to multiple columns from pyspark.sql.functions import col,from_json dfJSON = dfFromTxt.withColumn("jsonData",from_json(col("value"),schema)) \ .select("jsonData.*") dfJSON.printSchema() dfJSON.show(truncate=False)
Output
root |-- Zipcode: string (nullable = true) |-- ZipCodeType: string (nullable = true) |-- City: string (nullable = true) |-- State: string (nullable = true) +-------+---------------+-------------------+-----+ |Zipcode|ZipCodeType |City |State| +-------+---------------+-------------------+-----+ |299999 |292999STANDARD |PARC PARQUE |PR | |2999 |9999999STANDARD|PASEO COSTA DEL SUR|PR | |703 |STANDARD |PARC PARQUE |PR | |704 |STANDARD |PASEO COSTA DEL SUR|PR | +-------+---------------+-------------------+-----+
==> https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/
Create schema in Dict / JSON
schema_json: dict = {
"type" : "struct",
"fields" : [ {
"name" : "name",
"type" : {
"type" : "struct",
"fields" : [ {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "middlename",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "lastname",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"nullable" : true,
"metadata" : { }
}, {
"name" : "dob",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "gender",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "salary",
"type" : "integer",
"nullable" : true,
"metadata" : { }
} ]
}
Create schema from JSON:
import json
# schemaFromJson = StructType.fromJson(json.loads(df2.schema.json()))
schemaFromJson = StructType.fromJson(schema_json)
df3 = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),schemaFromJson)
df3.printSchema()
df3.show(truncate=False)
Result:
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+--------------------+-----+------+------+
|name |id |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith] |36636|M |3100 |
|[Michael, Rose, ] |40288|M |4300 |
|[Robert, , Williams]|42114|M |1400 |
|[Maria, Anne, Jones]|39192|F |5500 |
|[Jen, Mary, Brown] | |F |-1 |
+--------------------+-----+------+------+
-
Call API
url = f'https://www.als.ogcio.gov.hk/lookup' headers = {'Accept': 'application/json', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'} params = {'q': requestAddress, 'n': 3 } # sending get request and saving the response as response object response = requests.get(url=url, headers=headers, params=params) _json = response.json() _json
Output (
list of dict
in py){'RequestAddress': {'AddressLine': ['何文田邨']}, 'SuggestedAddress': [{'Address': {'PremisesAddress': {'ChiPremisesAddress': {'BuildingName': '何文田政府合署', 'ChiDistrict': {'DcDistrict': '九龍城區'}, 'ChiEstate': {'EstateName': '何文田邨'}, 'ChiStreet': {'BuildingNoFrom': '88', 'StreetName': '忠孝街'}, 'Region': '九龍'}, 'EngPremisesAddress': {'BuildingName': 'HO MAN TIN GOVERNMENT OFFICES', 'EngDistrict': {'DcDistrict': 'KOWLOON CITY DISTRICT'}, 'EngEstate': {'EstateName': 'HO MAN TIN ESTATE'}, 'EngStreet': {'BuildingNoFrom': '88', 'StreetName': 'CHUNG HAU STREET'}, 'Region': 'KLN'}, 'GeoAddress': '3658519520T20050430', 'GeospatialInformation': {'Easting': '836597', 'Latitude': '22.31468', 'Longitude': '114.18007', 'Northing': '819521'}}}, 'ValidationInformation': {'Score': 75.0}}, {'Address': {'PremisesAddress': {'ChiPremisesAddress': {'BuildingName': '何文田廣場', 'ChiDistrict': {'DcDistrict': '九龍城區'}, 'ChiEstate': {'EstateName': '何文田邨'}, 'ChiStreet': {'BuildingNoFrom': '80', 'StreetName': '佛光街'}, 'Region': '九龍'}, 'EngPremisesAddress': {'BuildingName': 'HOMANTIN PLAZA', 'EngDistrict': {'DcDistrict': 'KOWLOON CITY DISTRICT'}, 'EngEstate': {'EstateName': 'HO MAN TIN ESTATE'}, 'EngStreet': {'BuildingNoFrom': '80', 'StreetName': 'FAT KWONG STREET'}, 'Region': 'KLN'}, 'GeoAddress': '3677919691P20060311', 'GeospatialInformation': {'Easting': '836780', 'Latitude': '22.31622', 'Longitude': '114.18184', 'Northing': '819692'}}}, 'ValidationInformation': {'Score': 75.0}}, {'Address': {'PremisesAddress': {'ChiPremisesAddress': {'BuildingName': '靜文樓', 'ChiDistrict': {'DcDistrict': '九龍城區'}, 'ChiEstate': {'EstateName': '何文田邨'}, 'ChiStreet': {'BuildingNoFrom': '68', 'StreetName': '佛光街'}, 'Region': '九龍'}, 'EngPremisesAddress': {'BuildingName': 'CHING MAN HOUSE', 'EngDistrict': {'DcDistrict': 'KOWLOON CITY DISTRICT'}, 'EngEstate': {'EstateName': 'HO MAN TIN ESTATE'}, 'EngStreet': {'BuildingNoFrom': '68', 'StreetName': 'FAT KWONG STREET'}, 'Region': 'KLN'}, 'GeoAddress': '3683619541T20050430', 'GeospatialInformation': {'Easting': '836839', 'Latitude': '22.31497', 'Longitude': '114.18242', 'Northing': '819553'}}}, 'ValidationInformation': {'Score': 62.95}} ] }
-
Convert
list of dict
to PySpark RDD to Dataframerdd = sc.parallelize([_json]) readComplexJSONDF = spark.read.option("multiLine","true").json(rdd) readComplexJSONDF.show(truncate=False) readComplexJSONDF.printSchema()
Output
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |RequestAddress|SuggestedAddress | +--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | [[何文田邨]] | [[[[[何文田政府合署, [九龍城區], [何文田邨], [88, 忠孝街], 九龍], [HO MAN TIN GOVERNMENT OFFICES, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [88, CHUNG HAU STREET], KLN], 3658519520T20050430, [836597, 22.31468, 114.18007, 819521]]], [75.0]], [[[[何文田政府合署, [九龍城區], [何文田邨], [68, 佛光街], 九龍], [HO MAN TIN GOVERNMENT OFFICES, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [68, FAT KWONG STREET], KLN], 3658519520T20050430, [836597, 22.31468, 114.18007, 819521]]], [75.0]], [[[[何文田廣場, [九龍城區], [何文田邨], [80, 佛光街], 九龍], [HOMANTIN PLAZA, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [80, FAT KWONG STREET], KLN], 3677919691P20060311, [836780, 22.31622, 114.18184, 819692]]], [75.0]]] | +--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ root |-- RequestAddress: struct (nullable = true) | |-- AddressLine: array (nullable = true) | | |-- element: string (containsNull = true) |-- SuggestedAddress: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Address: struct (nullable = true) | | | |-- PremisesAddress: struct (nullable = true) | | | | |-- ChiPremisesAddress: struct (nullable = true) | | | | | |-- BuildingName: string (nullable = true) | | | | | |-- ChiDistrict: struct (nullable = true) | | | | | | |-- DcDistrict: string (nullable = true) | | | | | |-- ChiEstate: struct (nullable = true) | | | | | | |-- EstateName: string (nullable = true) | | | | | |-- ChiStreet: struct (nullable = true) | | | | | | |-- BuildingNoFrom: string (nullable = true) | | | | | | |-- StreetName: string (nullable = true) | | | | | |-- Region: string (nullable = true) | | | | |-- EngPremisesAddress: struct (nullable = true) | | | | | |-- BuildingName: string (nullable = true) | | | | | |-- EngDistrict: struct (nullable = true) | | | | | | |-- DcDistrict: string (nullable = true) | | | | | |-- EngEstate: struct (nullable = true) | | | | | | |-- EstateName: string (nullable = true) | | | | | |-- EngStreet: struct (nullable = true) | | | | | | |-- BuildingNoFrom: string (nullable = true) | | | | | | |-- StreetName: string (nullable = true) | | | | | |-- Region: string (nullable = true) | | | | |-- GeoAddress: string (nullable = true) | | | | |-- GeospatialInformation: struct (nullable = true) | | | | | |-- Easting: string (nullable = true) | | | | | |-- Latitude: string (nullable = true) | | | | | |-- Longitude: string (nullable = true) | | | | | |-- Northing: string (nullable = true) | | |-- ValidationInformation: struct (nullable = true) | | | |-- Score: double (nullable = true)
-
Explode Array to Structure
# Explode Array to Structure explodeArrarDF = readComplexJSONDF.withColumn('Explode_SuggestedAddress', F.explode(F.col('SuggestedAddress'))).drop('SuggestedAddress') explodeArrarDF.printSchema() explodeArrarDF.show() # Read location and name dfReadSpecificStructure = explodeArrarDF.select("Explode_SuggestedAddress.Address.PremisesAddress.ChiPremisesAddress.BuildingName", "Explode_SuggestedAddress.Address.PremisesAddress.ChiPremisesAddress.ChiDistrict.*", "Explode_SuggestedAddress.Address.PremisesAddress.ChiPremisesAddress.ChiEstate.*", "Explode_SuggestedAddress.Address.PremisesAddress.ChiPremisesAddress.ChiStreet.*", "Explode_SuggestedAddress.Address.PremisesAddress.ChiPremisesAddress.Region", "Explode_SuggestedAddress.Address.PremisesAddress.GeospatialInformation.*", "Explode_SuggestedAddress.ValidationInformation.*") dfReadSpecificStructure.show(truncate=False)
Output
root |-- RequestAddress: struct (nullable = true) | |-- AddressLine: array (nullable = true) | | |-- element: string (containsNull = true) |-- Explode_SuggestedAddress: struct (nullable = true) | |-- Address: struct (nullable = true) | | |-- PremisesAddress: struct (nullable = true) | | | |-- ChiPremisesAddress: struct (nullable = true) | | | | |-- BuildingName: string (nullable = true) | | | | |-- ChiDistrict: struct (nullable = true) | | | | | |-- DcDistrict: string (nullable = true) | | | | |-- ChiEstate: struct (nullable = true) | | | | | |-- EstateName: string (nullable = true) | | | | |-- ChiStreet: struct (nullable = true) | | | | | |-- BuildingNoFrom: string (nullable = true) | | | | | |-- StreetName: string (nullable = true) | | | | |-- Region: string (nullable = true) | | | |-- EngPremisesAddress: struct (nullable = true) | | | | |-- BuildingName: string (nullable = true) | | | | |-- EngDistrict: struct (nullable = true) | | | | | |-- DcDistrict: string (nullable = true) | | | | |-- EngEstate: struct (nullable = true) | | | | | |-- EstateName: string (nullable = true) | | | | |-- EngStreet: struct (nullable = true) | | | | | |-- BuildingNoFrom: string (nullable = true) | | | | | |-- StreetName: string (nullable = true) | | | | |-- Region: string (nullable = true) | | | |-- GeoAddress: string (nullable = true) | | | |-- GeospatialInformation: struct (nullable = true) | | | | |-- Easting: string (nullable = true) | | | | |-- Latitude: string (nullable = true) | | | | |-- Longitude: string (nullable = true) | | | | |-- Northing: string (nullable = true) | |-- ValidationInformation: struct (nullable = true) | | |-- Score: double (nullable = true) +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |RequestAddress| Explode_SuggestedAddress | +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[[何文田邨]] | [[[[何文田政府合署, [九龍城區], [何文田邨], [88, 忠孝街], 九龍], [HO MAN TIN GOVERNMENT OFFICES, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [88, CHUNG HAU STREET], KLN], 3658519520T20050430, [836597, 22.31468, 114.18007, 819521]]], [75.0]] | |[[何文田邨]] | [[[[何文田政府合署, [九龍城區], [何文田邨], [68, 佛光街], 九龍], [HO MAN TIN GOVERNMENT OFFICES, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [68, FAT KWONG STREET], KLN], 3658519520T20050430, [836597, 22.31468, 114.18007, 819521]]], [75.0]] | |[[何文田邨]] | [[[[何文田廣場, [九龍城區], [何文田邨], [80, 佛光街], 九龍], [HOMANTIN PLAZA, [KOWLOON CITY DISTRICT], [HO MAN TIN ESTATE], [80, FAT KWONG STREET], KLN], 3677919691P20060311, [836780, 22.31622, 114.18184, 819692]]], [75.0]] | +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +--------------+----------+----------+--------------+----------+------+-------+--------+---------+--------+-----+ |BuildingName |DcDistrict|EstateName|BuildingNoFrom|StreetName|Region|Easting|Latitude|Longitude|Northing|Score| +--------------+----------+----------+--------------+----------+------+-------+--------+---------+--------+-----+ |何文田政府合署 |九龍城區 |何文田邨 |88 |忠孝街 |九龍 |836597 |22.31468|114.18007|819521 |75.0 | |何文田政府合署 |九龍城區 |何文田邨 |68 |佛光街 |九龍 |836597 |22.31468|114.18007|819521 |75.0 | |何文田廣場 |九龍城區 |何文田邨 |80 |佛光街 |九龍 |836780 |22.31622|114.18184|819692 |75.0 | +--------------+----------+----------+--------------+----------+------+-------+--------+---------+--------+-----+
https://sparkbyexamples.com/pyspark/pyspark-maptype-dict-examples/
Steps,
- JSON from API
- get
list of dict
- pySpark dataframe with
map type
- access PySpark MapType Elements
-
# The nested json / list of dictionary data_json = [ ('James', {'hair': 'black', 'eye': 'brown'}), ('Michael', {'hair': 'brown', 'eye': None}), ('Robert', {'hair': 'red', 'eye': 'black'}), ('Washington', {'hair': 'grey', 'eye': 'grey'}), ('Jefferson', {'hair': 'brown', 'eye': ''}) ] df = spark.createDataFrame(data=data_json) df.printSchema()
Output:
root |-- Name: string (nullable = true) |-- properties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) +----------+-----------------------------+ |Name |properties | +----------+-----------------------------+ |James |[eye -> brown, hair -> black]| |Michael |[eye ->, hair -> brown] | |Robert |[eye -> black, hair -> red] | |Washington|[eye -> grey, hair -> grey] | |Jefferson |[eye -> , hair -> brown] | +----------+-----------------------------+
-
Access the elements in map datatype
Method (1):
df3 = df.rdd.map(lambda x: \ (x.name, x.properties["hair"], x.properties["eye"])) \ .toDF(["name", "hair", "eye"]) df3.printSchema() df3.show()
OR
Method (2):
df.withColumn("hair", df.properties.getItem("hair")) \ .withColumn("eye", df.properties.getItem("eye")) \ .drop("properties") \ .show() # same as above df.withColumn("hair", df.properties["hair"]) \ .withColumn("eye", df.properties["eye"]) \ .drop("properties") \ .show()
Output:
root |-- name: string (nullable = true) |-- hair: string (nullable = true) |-- eye: string (nullable = true) +----------+-----+-----+ | name| hair| eye| +----------+-----+-----+ | James|black|brown| | Michael|brown| null| | Robert| red|black| |Washington| grey| grey| | Jefferson|brown| | +----------+-----+-----+
Data, nested JSON in value
:
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| key| value|
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| UA000000107384208 | {"device":"macOS","ecommerce":{},"event_name":"checkout","event_previous_timestamp":1593880801027797,"event_timestamp":1593880822506642,"geo":{"city":"Traverse City","state":"MI"},"items":[{"item_id":"M_STAN_T","item_name":"Standard Twin Mattress","item_revenue_in_usd":595.0,"price_in_usd":595.0,"quantity":1}],"traffic_source":"google","user_first_touch_timestamp":1593879413256859,"user_id":"UA000000107384208"}
| UA000000107388621 | {"device":"Windows","ecommerce":{},"event_name":"email_coupon","event_previous_timestamp":1593880770092554,"event_timestamp":1593880829320848,"geo":{"city":"Hickory","state":"NC"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_F","item_name":"Standard Full Mattress","item_revenue_in_usd":850.5,"price_in_usd":945.0,"quantity":1}],"traffic_source":"direct","user_first_touch_timestamp":1593879889503719,"user_id":"UA000000107388621"}
| UA000000106459577 | {"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp":1593879787820475,"event_timestamp":1593879948830076,"geo":{"city":"Huntington Park","state":"CA"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":940.5,"price_in_usd":1045.0,"quantity":1},{"coupon":"NEWBED10","item_id":"P_DOWN_S","item_name":"Standard Down Pillow","item_revenue_in_usd":107.10000000000001,"price_in_usd":119.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593583891412316,"user_id":"UA000000106459577"}|
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
SQL
- Use : syntax in queries to access subfields in JSON strings
- Use . syntax in queries to access subfields in struct types
SELECT * FROM events_strings WHERE value:event_name = "finalize" ORDER BY key LIMIT 1
OR Python:
display(events_stringsDF
.where("value:event_name = 'finalize'")
.orderBy("key")
.limit(1)
)
Output:
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| key| value|
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
| UA000000106459577 | {"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp":1593879787820475,"event_timestamp":1593879948830076,"geo":{"city":"Huntington Park","state":"CA"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":940.5,"price_in_usd":1045.0,"quantity":1},{"coupon":"NEWBED10","item_id":"P_DOWN_S","item_name":"Standard Down Pillow","item_revenue_in_usd":107.10000000000001,"price_in_usd":119.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593583891412316,"user_id":"UA000000106459577"}|
+-------------------+---------------------------------------------------------------------------------------------------------------------------+
[ xxx ]
==> columnar data format. Convenient for data analysis, because program just reads a whole column data instead of scanning all rows.
customer.csv
:
CUSTKEY,NAME,ADDRESS,NATIONKEY,PHONE,ACCTBAL,MKTSEGMENT,COMMENT
1,Customer#000000001,IVhzIApeRb ot,c,E,15,25-989-741-2988,711.56,BUILDING,to the even, regular platelets. regular, ironic epitaphs nag e,
2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,13,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,
3,Customer#000000003,MG9kdTD2WBHm,1,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,
4,Customer#000000004,XxVSJsLAGtn,4,14-128-190-5944,2866.83,MACHINERY, requests. final, regular ideas sleep final accou,
...
orders.csv
:
ORDERKEY,CUSTKEY,ORDERSTATUS,TOTALPRICE,ORDERDATE,ORDERPRIORITY,CLERK,SHIPPRIORITY,COMMENT
1,370,O,172799.49,1996-01-02,5-LOW,Clerk#000000951,0,nstructions sleep furiously among ,
2,781,O,38426.09,1996-12-01,1-URGENT,Clerk#000000880,0, foxes. pending accounts at the pending, silent asymptot,
3,1234,F,205654.30,1993-10-14,5-LOW,Clerk#000000955,0,sly final accounts boost. carefully regular ideas cajole carefully. depos,
4,1369,O,56000.91,1995-10-11,5-LOW,Clerk#000000124,0,sits. slyly regular warthogs cajole. regular, regular theodolites acro,
5,445,F,105367.67,1994-07-30,5-LOW,Clerk#000000925,0,quickly. bold deposits sleep slyly. packages use slyly,
...
dfCustomer = spark.read.csv('customer.csv', header=True, inferSchema=True)
dfOrders = spark.read.csv('orders.csv', header=True, inferSchema=True)
Generate example .csv:
values = [
"""Id,RecordNumber,Zipcode,ZipCodeType,State
{Id},99999,704,STANDARD,PR
{Id},999999,563,STANDARD,PR
""",
"""Id,RecordNumber,Zipcode,ZipCodeType,State
{Id},99999,704,STANDARD,PR
{Id},999999,563,STANDARD,PR
""",
"""Id,RecordNumber,Zipcode,ZipCodeType,State
{Id},99999,704,STANDARD,PR
{Id},999999,563,STANDARD,PR
""",
]
for i,value in enumerate(values):
for j,value in enumerate(values):
with open(f'file_{i}{j}.csv', 'w') as f:
f.write(value.format(Id=f'id_{i}{j}'))
# f.write('\n')
$ ls -l
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_00.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_01.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_02.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_10.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_11.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_12.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_20.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_21.csv
-rw-r--r-- 1 root root 99 Jul 8 03:43 file_22.csv
file_00.csv
:
Id,RecordNumber,Zipcode,ZipCodeType,State
id_00,99999,704,STANDARD,PR
id_00,999999,563,STANDARD,PR
file_01.csv
:
Id,RecordNumber,Zipcode,ZipCodeType,State
id_01,99999,704,STANDARD,PR
id_01,999999,563,STANDARD,PR
file_10.csv
:
Id,RecordNumber,Zipcode,ZipCodeType,State
id_10,99999,704,STANDARD,PR
id_10,999999,563,STANDARD,PR
file_21.csv
:
Id,RecordNumber,Zipcode,ZipCodeType,State
id_21,99999,704,STANDARD,PR
id_21,999999,563,STANDARD,PR
[a-b]
- The character class matches a single character in the range of values. It is represented by the range of characters you want to match inside a set of brackets.
Reason of file_1x.csv
,file2x.csv
are included:
1x
and 2x
match 1
and 2
in [0-2]
filename = "file_[0-2]*.csv"
print(filename)
dfFromTxt = (spark
.read.option("header",True)
.csv(filename) #load csv
)
dfFromTxt.printSchema()
dfFromTxt.show(truncate=False)
file_[0-2]*.csv
root
|-- Id: string (nullable = true)
|-- RecordNumber: string (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- State: string (nullable = true)
+-----+------------+-------+-----------+-----+
|Id |RecordNumber|Zipcode|ZipCodeType|State|
+-----+------------+-------+-----------+-----+
|id_00|99999 |704 |STANDARD |PR |
|id_00|999999 |563 |STANDARD |PR |
|id_01|99999 |704 |STANDARD |PR |
|id_01|999999 |563 |STANDARD |PR |
|id_02|99999 |704 |STANDARD |PR |
|id_02|999999 |563 |STANDARD |PR |
|id_10|99999 |704 |STANDARD |PR |
|id_10|999999 |563 |STANDARD |PR |
|id_11|99999 |704 |STANDARD |PR |
|id_11|999999 |563 |STANDARD |PR |
|id_12|99999 |704 |STANDARD |PR |
|id_12|999999 |563 |STANDARD |PR |
|id_20|99999 |704 |STANDARD |PR |
|id_20|999999 |563 |STANDARD |PR |
|id_21|99999 |704 |STANDARD |PR |
|id_21|999999 |563 |STANDARD |PR |
|id_22|99999 |704 |STANDARD |PR |
|id_22|999999 |563 |STANDARD |PR |
+-----+------------+-------+-----------+-----+
filename = "file_{00,10}*.csv"
print(filename)
dfFromTxt = (spark
.read.option("header",True)
.csv(filename) #load csv
)
dfFromTxt.printSchema()
dfFromTxt.show(truncate=False)
file_{00,10}*.csv
root
|-- Id: string (nullable = true)
|-- RecordNumber: string (nullable = true)
|-- Zipcode: string (nullable = true)
|-- ZipCodeType: string (nullable = true)
|-- State: string (nullable = true)
+-----+------------+-------+-----------+-----+
|Id |RecordNumber|Zipcode|ZipCodeType|State|
+-----+------------+-------+-----------+-----+
|id_00|99999 |704 |STANDARD |PR |
|id_00|999999 |563 |STANDARD |PR |
|id_10|99999 |704 |STANDARD |PR |
|id_10|999999 |563 |STANDARD |PR |
+-----+------------+-------+-----------+-----+
Reading .csv/.json by a pre-defined schema can speed up data import, because Spark doesn't need to scan values in each column/attribute to auto-build the schema based on data.
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col3"])
df1.unionByName(df2, allowMissingColumns=True).show()
Output:
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
| 1| 2| 3|null|
|null| 4| 5| 6|
+----+----+----+----+
Steps,
- Create the missing columns for both dataframes and filled them will NULL
- UNION two dataframes
Step 1 - Create the missing columns:
import pyspark.sql.functions as F
data = [["1", "sravan", "kakumanu"],
["2", "ojaswi", "hyd"],
["3", "rohith", "delhi"],
["4", "sridevi", "kakumanu"],
["5", "bobby", "guntur"]]
columns = ['ID', 'NAME', 'Address']
dataframe1 = spark.createDataFrame(data, columns)
data = [["1", 23],
["2", 21],
["3", 32],
]
columns = ['ID', 'Age']
dataframe2 = spark.createDataFrame(data, columns)
overall_columns = set(
[*df_sdu_suppliers.columns, *df_sdu_buyers.columns, *df_crunchbase.columns]
)
print(len(overall_columns), overall_columns)
def createMissingDfColumns(df: DataFrame, target_columns:list):
_columns = list(filter(lambda _col: _col not in df.columns, target_columns))
print(_columns)
for column in _columns:
df = df.withColumn(column, F.lit(None))
return df.select(*target_columns)
dataframe1_target = createMissingDfColumns(df=dataframe1, target_columns=overall_columns)
dataframe2_target = createMissingDfColumns(df=dataframe2, target_columns=overall_columns)
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df = df.withColumns({'age2': df.age + 2, 'age3': df.age + 3})
df.withColumnsRenamed({'age2': 'age4', 'age3': 'age5'}).show()
Output:
+---+-----+----+----+
|age| name|age4|age5|
+---+-----+----+----+
| 2|Alice| 4| 5|
| 5| Bob| 7| 8|
+---+-----+----+----+
df2 = df.withColumnRenamed(existingName, newNam)
Reference: PySpark - rename more than one column using withColumnRenamed
Code:
def renameColumns(df, mapping):
'''df: PySpark DataFrame. Return PySpark DataFrame '''
if isinstance(mapping, dict):
'''mapping.get(old_name, default_name)
D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.'''
return df.select(*[F.col(col_name).alias(mapping.get(col_name, col_name)) for col_name in df.columns])
else:
raise ValueError("'mapping' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")
Explain:
mapping = {"old": "new"}
print(mapping.get('old', 'default'))
print(mapping.get('xxx', 'default'))
# ==== output
# new
# default
import pyspark.sql.functions as F
df = xxxx
# lowercase all column names
df = df.toDF(*[c.lower() for c in df.columns])
import pyspark.sql.functions as F
df = xxxx
df = df.select(*[F.lower(F.col(col)).name(col) for col in df.columns])
dfCustomer.printSchema()
print(dfCustomer.count())
dfOrders.printSchema()
print(dfOrders.count())
Output:
root
|-- CUSTKEY: integer (nullable = true)
|-- NAME: string (nullable = true)
|-- ADDRESS: string (nullable = true)
|-- NATIONKEY: string (nullable = true)
|-- PHONE: string (nullable = true)
|-- ACCTBAL: string (nullable = true)
|-- MKTSEGMENT: string (nullable = true)
|-- COMMENT: string (nullable = true)
1500
root
|-- ORDERKEY: integer (nullable = true)
|-- CUSTKEY: integer (nullable = true)
|-- ORDERSTATUS: string (nullable = true)
|-- TOTALPRICE: double (nullable = true)
|-- ORDERDATE: string (nullable = true)
|-- ORDERPRIORITY: string (nullable = true)
|-- CLERK: string (nullable = true)
|-- SHIPPRIORITY: integer (nullable = true)
|-- COMMENT: string (nullable = true)
15000
==> Reference: https://stackoverflow.com/questions/54951348/pyspark-milliseconds-of-timestamp/54961415#54961415
df.withColumn('new_timestamp', F.unix_timestamp('timestamp_str_with_custom_format', format=timeFmt))
Example:
import pyspark.sql.functions as F
timeFmt = "yyyy-MM-dd't'HH:mm:ss.SSS"
data = [
(1, '2018-07-25t17:15:06.390', '1532538906390'), # note the '390'
(2, '2018-07-25t11:12:48.883', '1532560368883')
]
df = spark.createDataFrame(data, ['ID', 'timestamp_string', 'timestamp'])
df = df.withColumn('timestamp_string_1', F.unix_timestamp('timestamp_string', format=timeFmt))\
.withColumn('timestamp_string_2', (F.col("timestamp_string_1")).cast(TimestampType()))\
.withColumn('timestamp_2', (F.col("timestamp") / 1000).cast(TimestampType()))\
.select('timestamp', 'timestamp_2', 'timestamp_string', 'timestamp_string_1', 'timestamp_string_2')
df.show(truncate=False)
df.printSchema()
Output:
+-------------+-----------------------+-----------------------+------------------+-------------------+
|timestamp |timestamp_2 |timestamp_string |timestamp_string_1|timestamp_string_2 |
+-------------+-----------------------+-----------------------+------------------+-------------------+
|1532538906390|2018-07-26 01:15:06.39 |2018-07-25t17:15:06.390|1532510106 |2018-07-25 17:15:06|
|1532560368883|2018-07-26 07:12:48.883|2018-07-25t11:12:48.883|1532488368 |2018-07-25 11:12:48|
+-------------+-----------------------+-----------------------+------------------+-------------------+
root
|-- timestamp: string (nullable = true)
|-- timestamp_2: timestamp (nullable = true)
|-- timestamp_string: string (nullable = true)
|-- timestamp_string_1: long (nullable = true)
|-- timestamp_string_2: timestamp (nullable = true)
Example:
import pyspark.sql.functions as F
timeFmt = "yyyy-MM-dd't'HH:mm:ss.SSS"
data = [
(1, '2018-07-25t17:15:06.390', '1532538906390'), # note the '390'
(2, '2018-07-25t11:12:48.883', '1532560368883')
]
df = spark.createDataFrame(data, ['ID', 'timestamp_string', 'timestamp'])
### change datetime from current timestamp
df = df.withColumn("hk_timezone", F.from_utc_timestamp(F.current_timestamp(),"Asia/Hong_Kong"))
### method 2
df = df.withColumn("existing_datetime_hk_timezone", F.from_utc_timestamp("existing_datetime", "Asia/Hong_Kong"))
Code:
import pyspark.sql.functions as F
df = df.withColumn('day_of_week', ((F.dayofweek("yyyymmddhh start")+5)%7)+1)
df = df.withColumn('is_weekday', ((F.dayofweek("yyyymmddhh start")+5)%7)+1 < 6)
Result:
df.printSchema()
df.select("journey id", "yyyymmddhh start", "day_of_week", "is_weekday").show()
root
|-- journey id: string (nullable = true)
|-- yyyymmddhh start: timestamp (nullable = true)
|-- yyyymmddhh end: timestamp (nullable = true)
|-- day_of_week: integer (nullable = true)
|-- is_weekday: boolean (nullable = true)
+----------+-------------------+-----------+----------+
|journey id| yyyymmddhh start|day_of_week|is_weekday|
+----------+-------------------+-----------+----------+
| 953|2017-09-19 17:26:00| 2| true|
| 14659|2017-09-13 19:13:00| 3| true|
| 2351|2017-09-14 14:31:00| 4| true|
| 7252|2017-09-17 17:00:00| 7| false|
| 9782|2017-09-17 17:00:00| 7| false|
| 13500|2017-09-15 13:33:00| 5| true|
| 11205|2017-09-15 15:47:00| 5| true|
+----------+-------------------+-----------+----------+
https://sparkbyexamples.com/pyspark/pyspark-convert-dataframe-columns-to-maptype-dict/
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [ ("36636","Finance",3000,"USA"),
("40288","Finance",5000,"IND"),
("42114","Sales",3900,"USA"),
("39192","Marketing",2500,"CAN"),
("34534","Sales",6500,"USA") ]
schema = StructType([
StructField('id', StringType(), True),
StructField('dept', StringType(), True),
StructField('salary', IntegerType(), True),
StructField('location', StringType(), True)
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- salary: integer (nullable = true)
|-- location: string (nullable = true)
+-----+---------+------+--------+
|id |dept |salary|location|
+-----+---------+------+--------+
|36636|Finance |3000 |USA |
|40288|Finance |5000 |IND |
|42114|Sales |3900 |USA |
|39192|Marketing|2500 |CAN |
|34534|Sales |6500 |USA |
+-----+---------+------+--------+
#Convert columns to Map
from pyspark.sql.functions import col,lit,create_map
df = df.withColumn("propertiesMap",create_map(
lit("salary"),col("salary"),
lit("location"),col("location")
)).drop("salary","location")
df.printSchema()
df.show(truncate=False)
root
|-- id: string (nullable = true)
|-- dept: string (nullable = true)
|-- propertiesMap: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+-----+---------+-----------------------------------+
|id |dept |propertiesMap |
+-----+---------+-----------------------------------+
|36636|Finance |{'salary': 3000, 'location': 'USA'}|
|40288|Finance |{'salary': 5000, 'location': 'IND'}|
|42114|Sales |{'salary': 3900, 'location': 'USA'}|
|39192|Marketing|{'salary': 2500, 'location': 'CAN'}|
|34534|Sales |{'salary': 6500, 'location': 'USA'}|
+-----+---------+-----------------------------------+
Find count of orders
of each customer CUSTKEY
has:
dfOrders_groupby = dfOrders.groupBy('CUSTKEY').count()
dfOrders_groupby.toPandas()
Output:
CUSTKEY count
0 463 20
1 1342 20
2 496 18
3 148 15
4 1088 7
... ... ...
995 401 12
996 517 25
997 422 12
998 89 7
999 1138 23
1000 rows × 2 columns
[ xxxx ]
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.agg.html
- https://spark.apache.org/docs/2.4.4/sql-pyspark-pandas-with-arrow.html#grouped-map
[ xxx ]
from pyspark.sql.functions import udf,collect_list
from pyspark.sql.types import MapType,StringType
combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
MapType(StringType(),StringType()))
df.groupBy('id')\
.agg(collect_list('map')\
.alias('maps'))\
.select('id', combineMap('maps').alias('combined_map')).show()
df.createOrReplaceTempView("sql_table")
, allows to run SQL queries once register df
as temporary tables
dfOrders_groupby.createOrReplaceTempView("sql_table")
# Can run SQL query on it
df = spark.sql("SELECT customer.CUSTKEY, orders.count FROM customer left outer join orders on customer.CUSTKEY = orders.CUSTKEY")
df.toPandas()
Output:
CUSTKEY count
0 1 9.0
1 2 10.0
2 3 NaN
3 4 31.0
4 5 9.0
... ... ...
1495 1496 9.0
1496 1497 NaN
1497 1498 20.0
1498 1499 21.0
1499 1500 NaN
1500 rows × 2 columns
https://sparkbyexamples.com/pyspark/pyspark-window-functions/
# join 2 df by `CUSTKEY`
joined_df = dfCustomer.join(dfOrders, on='CUSTKEY', how='leftouter')
df2 = joined_df.select('CUSTKEY', 'ORDERKEY').sort(asc('CUSTKEY'),desc('ORDERKEY')) #ascending by 'CUSTKEY', descending by 'ORDERKET'
df2.toPandas() #view
how
: str, optional
default inner
. Must be one of: inner
, cross
, outer
,
full
, fullouter
, full_outer
, left
, leftouter
, left_outer
,
right
, rightouter
, right_outer
, semi
, leftsemi
, left_semi
,
anti
, leftanti
and left_anti
.
dfOrders.createOrReplaceTempView("orders")
dfCustomer.createOrReplaceTempView("customer")
# Can run SQL query on it
df2 = spark.sql("SELECT customer.CUSTKEY, orders.ORDERKEY FROM customer left outer join orders on customer.CUSTKEY = orders.CUSTKEY")
df2.toPandas()
Output:
CUSTKEY ORDERKEY
0 1 53283.0
1 1 52263.0
2 1 43879.0
3 1 36422.0
4 1 34019.0
... ... ...
15495 1499 7301.0
15496 1499 3523.0
15497 1499 1472.0
15498 1499 1252.0
15499 1500 NaN
15500 rows × 2 columns
The dataframes may need to have identical columns, in which case you can use withColumn()
to create normal_1
and normal_2
df_concat = df_1.union(df_2)
https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87
Explicitly define a udf that you can use as a pyspark function.
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
def say_hello(name : str) -> str:
return f"Hello {name}"
assert say_hello("Summer") == "Hello Summer"
say_hello_udf = udf(lambda name: say_hello(name), StringType())
df = spark.createDataFrame([("Rick,"),("Morty,")], ["name"])
df.withColumn("greetings", say_hello_udf(col("name"))).show()
# +------+------------+
# | name| greetings|
# +------+------------+
# | Rick| Hello Rick|
# | Morty| Hello Morty|
# +------+------------+
However, this means that for every pyspark UDF, there are two functions to keep track of — a regular python one and another pyspark _udf
one. For a cleaner pattern, the udf function is also a built in decorator.
https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87
@udf(returnType=StringType())
def say_hello(name):
return f"Hello {name}"
# Below `assert` doesn't work anymore if decorator `@udf` is used
# assert say_hello("Summer") == "Hello Summer"
df.withColumn("greetings", say_hello(col("name"))).show()
https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87
Introducing — py_or_udf
— a decorator that allows a method to act as either a regular python method or a pyspark UDF
from typing import Callable
from pyspark.sql import Column
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType, DataType
class py_or_udf:
def __init__(self, returnType : DataType=StringType()):
self.spark_udf_type = returnType
def __call__(self, func : Callable):
def wrapped_func(*args, **kwargs):
if any([isinstance(arg, Column) for arg in args]) or \
any([isinstance(vv, Column) for vv in kwargs.values()]):
return udf(func, self.spark_udf_type)(*args, **kwargs)
else:
return func(*args, **kwargs)
return wrapped_func
@py_or_udf(returnType=StringType())
def say_hello(name):
return f"Hello {name}"
# This works
assert say_hello("world") == "Hello world"
# This also works
df.withColumn("greeting", say_hello(col("name"))).show()
- https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe
- https://gist.github.com/korkridake/972e315e5ce094096e17c6ad1ef599fd
Code example:
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable
def melt(
df: DataFrame,
id_vars: Iterable[str], value_vars: Iterable[str],
var_name: str="variable", value_name: str="value") -> DataFrame:
"""
Convert :class:`DataFrame` from wide to long format.
Source: https://stackoverflow.com/questions/41670103/how-to-melt-spark-dataframe
"""
# -------------------------------------------------------------------------------
# Create array<struct<variable: str, value: ...>>
# -------------------------------------------------------------------------------
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# -------------------------------------------------------------------------------
# Add to the DataFrame and explode
# -------------------------------------------------------------------------------
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
# -------------------------------------------------------------------------------
# Let's Implement Wide to Long in Pyspark!
# -------------------------------------------------------------------------------
melt(df_web_browsing_full_test,
id_vars=['ID_variable'],
value_vars=['VALUE_variable_1', 'VALUE_variable_2']
var_name='variable',
value_name='value',
).show()
Result:
+-----------+--------+----------------+
|ID_variable|variable| value|
+-----------+--------+----------------+
| id00000001| 2023-01| 0.0|
| id03947263| 2023-02| 488.49382|
| id58723942| 2023-03| 8644.84643|
| id09474733| 2023-04| 1431.49900|
| id00012398| 2023-05| 0.0|
+-----------+--------+----------------+
The exceptAll()
function in PySpark is used to return a new DataFrame containing rows in the first DataFrame but not in another DataFrame, while preserving duplicates. This is equivalent to EXCEPT ALL
in SQL. As standard in SQL, this function resolves columns by position (not by name)¹.
Here's an example:
# Create two DataFrames
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b", 3), ("c", 4)], ["C1", "C2"]
)
df2 = spark.createDataFrame([("a", 1), ("b", 3)], ["C1", "C2"])
# Use exceptAll() function
result = df1.exceptAll(df2)
# Show the result
result.show()
Output:
+---+---+
| C1| C2|
+---+---+
| a| 1|
| a| 1|
| a| 2|
| c| 4|
+---+---+
In this example, the exceptAll()
function returns all rows from df1
that are not in df2
, while preserving duplicates¹. So, even though the value ("a", 1)
appears three times in df1
and once in df2
, two instances of it are still present in the resulting DataFrame. This is because exceptAll()
preserves duplicates.
Source: Conversation with Bing, 18/04/2024 (1) pyspark.sql.DataFrame.exceptAll — PySpark master documentation. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.exceptAll.html. (2) pyspark.sql.DataFrame.exceptAll — PySpark master documentation. https://api-docs.databricks.com/python/pyspark/latest/pyspark.sql/api/pyspark.sql.DataFrame.exceptAll.html. (3) PySpark: Get Rows Which Are Not in Another DataFrame. https://www.statology.org/pyspark-get-rows-not-in-another-dataframe/. (4) github.com. https://github.com/PandaGege/big_data/tree/eb96d4439611337f4d95764b65ccbd13a270b899/spark%2Fsql_dataframe_guide.py. (5) github.com. https://github.com/bhishanpdl/Big_Data_Analysis/tree/b4aec3a0e285de5cac02ad7390712635a73a24db/a01_PySpark%2Fa01_Pyspark_Basics%2FREADME.md. (6) github.com. https://github.com/Sapphirine/Esport-Game-Prediction/tree/243a729cd395a4fb63a800ac88a13b55bdbd3989/dashboard%2Fvenv%2Flib%2Fpython2.7%2Fsite-packages%2Fpyspark%2Fsql%2Fdataframe.py.
(2) subtract()
- Is this the same as "subtract()"? Can I use join (but which type of join) to achieve this?
The subtract()
function in PySpark is similar to exceptAll()
, but there's a key difference: subtract()
is equivalent to EXCEPT DISTINCT
in SQL¹. This means that subtract()
removes all duplicates. If a row appears in both DataFrames, it will be removed entirely from the first DataFrame¹.
Here's an example:
# Create two DataFrames
df1 = spark.createDataFrame(
[("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"]
)
df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# Use subtract() function
result = df1.subtract(df2)
# Show the result
result.show()
Output:
+---+---+
| C1| C2|
+---+---+
| c| 4|
+---+---+
In this example, the subtract()
function removes all instances of ("a", 1)
from df1
because it appears in df2
¹.
As for using a join operation to achieve the same result as exceptAll()
, you can use a LEFT ANTI JOIN
. This type of join returns only the rows from the left DataFrame that are not present in the right DataFrame². Here's how you can do it:
# Use left anti join
result = df1.join(df2, ["C1", "C2"], "left_anti")
# Show the result
result.show()
This will give you the same result as df1.exceptAll(df2)
. However, please note that this method requires the join to be performed on all columns. If you want to perform the operation based on specific columns, you would need to adjust the join condition accordingly².
Source: Conversation with Bing, 18/04/2024 (1) pyspark.sql.DataFrame.subtract — PySpark master documentation. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.subtract.html. (2) Spark: subtract two DataFrames - Stack Overflow. https://stackoverflow.com/questions/29537564/spark-subtract-two-dataframes. (3) python - Subtract in pyspark dataframe - Stack Overflow. https://stackoverflow.com/questions/57483536/subtract-in-pyspark-dataframe. (4) How to subtract two columns of pyspark dataframe and also divide?. https://stackoverflow.com/questions/57105403/how-to-subtract-two-columns-of-pyspark-dataframe-and-also-divide. (5) pyspark.sql.DataFrame.exceptAll — PySpark master documentation. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.exceptAll.html. (6) Keeping identifier in exceptAll in PySpark - Stack Overflow. https://stackoverflow.com/questions/59762501/keeping-identifier-in-exceptall-in-pyspark. (7) pyspark.sql.DataFrame.exceptAll — PySpark master documentation. https://api-docs.databricks.com/python/pyspark/latest/pyspark.sql/api/pyspark.sql.DataFrame.exceptAll.html. (8) github.com. https://github.com/intel-analytics/BigDL/tree/11a0600676f8c400fa2686f5a52d70dcc754b728/ppml%2Ftrusted-big-data-ml%2Fpython%2Fdocker-graphene%2Fexamples%2Fsql_example.py.
To get the duplicates and non-duplicates using a "left anti" join in PySpark, you can follow these steps:
- Non-Duplicates: The result of a "left anti" join itself gives you the non-duplicate records. These are the records that are present in the left DataFrame but not in the right DataFrame².
non_duplicates = df1.join(df2, ["C1", "C2"], "left_anti")
non_duplicates.show()
- Duplicates: To get the duplicate records, you can first find all records in the left DataFrame, then subtract the non-duplicates from it³.
all_records = df1
duplicates = all_records.subtract(non_duplicates)
duplicates.show()
In this example, all_records
is a DataFrame that contains all records from df1
. non_duplicates
is the DataFrame we obtained from the "left anti" join, which contains all records from df1
that are not in df2
. The subtract()
function then gives us all records from all_records
that are not in non_duplicates
, which are the duplicate records³.
Please note that this method considers all columns for identifying duplicates. If you want to consider specific columns, you would need to adjust the join and subtract conditions accordingly³. Also, be aware that null values are treated as distinct in PySpark, so you might need to handle them separately if your data contains null values¹.
Source: Conversation with Bing, 18/04/2024 (1) PySpark SQL Left Anti Join with Example - Spark By Examples. https://sparkbyexamples.com/pyspark/pyspark-sql-left-anti-join-with-example/. (2) Solved: How to get all occurrences of duplicate records in .... https://community.databricks.com/t5/data-engineering/how-to-get-all-occurrences-of-duplicate-records-in-a-pyspark/td-p/19818. (3) pyspark - Left Anti join not consider null as duplicate values in Spark .... https://stackoverflow.com/questions/62423032/left-anti-join-not-consider-null-as-duplicate-values-in-spark. (4) Spark SQL Left Anti Join with Example - Spark By {Examples}. https://sparkbyexamples.com/spark/spark-sql-left-anti-join-with-example/. (5) undefined. https://spark.apache.org/docs/3.0.0-preview/sql-ref-null-semantics.html.
- https://docs.databricks.com/en/connect/storage/azure-storage.html
- https://learn.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes
Need to store secrets (=key-value pair) in Azure Key Vault first, then create a secret scope backed by Azure Key Vault in Databricks.
Because the Azure Key Vault-backed secret scope is a read-only interface to the Key Vault, the PutSecret and DeleteSecret the Secrets API operations are not allowed.
service_credential = dbutils.secrets.get(scope="<secret-scope>",key="<service-credential-key>")
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
Replace
- with the Databricks secret scope name.
- with the name of the key containing the client secret.
- with the name of the Azure storage account.
- with the Application (client) ID for the Microsoft Entra ID application.
- with the Directory (tenant) ID for the Microsoft Entra ID application.
spark.read.load("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-data>")
dbutils.fs.ls("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-data>")
### from mount point
import pandas as pd
pd.read_csv("file:/dbfs/mnt/ext-folder/UAT/logo/log/tmp.csv")
# Define the file path and string content
file_path = "/mnt/databricks-mount/your_file_name.txt"
content = "Your string content"
# Save the string content as a .txt file in Azure Blob Storage
dbutils.fs.put(file_path, content, overwrite=True)
print("File uploaded successfully.")
import nest_asyncio ### need to add if using async in Databricks
nest_asyncio.apply() ### https://community.databricks.com/t5/data-engineering/asynchronous-api-calls-from-databricks/td-p/4691
################################
import aiohttp
import asyncio
from azure.storage.blob.aio import BlobServiceClient
async def download_image(session, url, destination):
async with session.get(url) as response:
with open(destination, 'wb') as file:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
file.write(chunk)
async def upload_image(blob_client, source, destination):
with open(source, "rb") as data:
await blob_client.upload_blob(data, blob_type="BlockBlob", overwrite=True, blob_name=destination)
async def download_images_and_upload_to_azure(urls, destination_folder, connection_string, container_name):
tasks = []
async with aiohttp.ClientSession() as session:
for i, url in enumerate(urls):
destination = f"{destination_folder}/image_{i + 1}.jpg"
tasks.append(download_image(session, url, destination))
await asyncio.gather(*tasks)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
for i, url in enumerate(urls):
source = f"{destination_folder}/image_{i + 1}.jpg"
destination = f"images/image_{i + 1}.jpg"
await upload_image(container_client.get_blob_client(destination), source, destination)
async def main():
# Define the list of image URLs
image_urls = [
"https://commons.wikimedia.org/wiki/File:ChatGPT_logo.svg#/media/File:ChatGPT_logo.svg",
"https://commons.wikimedia.org/wiki/File:Chatgpt_idwiktionary.jpg#/media/File:Chatgpt_idwiktionary.jpg",
"https://upload.wikimedia.org/wikipedia/commons/4/4d/OpenAI_Logo.svg"
]
# Define the destination folder to save the downloaded images
destination_folder = "downloaded_images"
# Define your Azure Storage connection string and container name
connection_string = "<your_connection_string>"
container_name = "<your_container_name>"
# Start the download and upload process
await download_images_and_upload_to_azure(image_urls, destination_folder, connection_string, container_name)
print("Images downloaded and uploaded successfully.")
# Run the asyncio event loop
asyncio.run(main())
Credit to link
# Vertics DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 37),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 38),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edges DataFrame
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"), # b and c follow each other
("c", "b", "follow"), #
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend"),
("g", "e", "follow")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
g.vertices.show()
g.edges.show()
# Vertics DataFrame
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 37),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 38),
("g", "Gabby", 60)
], ["id", "name", "age"])
# Edges DataFrame
e = spark.createDataFrame([
("a", "b", "follow"),
("c", "a", "friend"),
("b", "c", "follow"),
("d", "a", "follow"),
("f", "c", "follow"),
("f", "d", "follow"),
("f", "b", "follow"),
("c", "d", "follow"),
("g", "a", "friend"),
("g", "d", "friend"),
("g", "c", "friend"),
("e", "a", "follow"),
("e", "d", "follow")
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
Credit to link
g.triplets.show() # display all
g.vertices.show() # display vertices
g.edges.show() # display edges
g.degrees.show()
g.inDegrees.show()
g.outDegrees.show()
Returns GraphFrame
, not DataFrame
.
Credit to link
g.filterVerices("columnName > 30")
g.filterEdges("columnName = 30")
g.dropIsolatedVertices() #Drop isolated vertices (users) which are not contained in any edges (relationships).
#Vertices without incoming / outgoing edges
Find the edges e
from vertex a
to vertex b
.
P.S. .find()
returns sparkDF
DataFrame.
Credit to link
g.find("(a)-[]->(b);(b)-[]->(a)").filter("a.id < b.id") # A and B follow/friend each other;
# .filter() out "B follows/friends back A" rows,
# just keeps "A follows/friends B" rows
g.find("(a)-[]->(b); !(b)-[]->(a)").filter("a.id < b.id") # jsut A follows B, B not follows A
g.find("!()-[]->(a)") # find vertices without incoming edges
g.find("(a)-[e]->(b)").filter("e.relationship = 'follow'") # find A follows B,
Credit to msbd5003
.
# Build subgraph based on conditions, i.e. subgraph contains (v,e)
# Select subgraph of users older than 30, and relationships of type "friend".
# Drop isolated vertices (users) which are not contained in any edges (relationships).
g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'")\
.dropIsolatedVertices()
g1.vertices.show()
g1.edges.show()
Output:
+---+------+---+
| id| name|age|
+---+------+---+
| e|Esther| 32|
| b| Bob| 36|
| a| Alice| 34|
+---+------+---+
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| a| e| friend|
| a| b| friend|
+---+---+------------+
Reading and writing to SQL Server:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("database-read-write") \
.getOrCreate()
url = "jdbc:sqlserver://<hostname>:<port>;database=<database_name>"
properties = {
"user": "<username>",
"password": "<password>",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
table_name = "<table_name>"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)
mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)
Setup:
driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()
JDBC connection logic:
# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "OURCONNECTIONURL;") \
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
.option("dbtable", tableName) \
.option("user", "USERNAME") \
.option("password", "PASSWORD") \
.load()
filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")
Multi-thread read Database:
from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close()
pool.join()
print(*results, sep='\n')
Output:
C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!
Run the following in macOS terminal,
-
Modify the PATH variables,
$ nano ~/.bashrc
-
Add the following lines in
~/.bashrc
, sospark
andjupyter notebook
can be launched at the same time.# Setting PATH for Spark 3.1.2 export SPARK_HOME=/usr/local/Cellar/apache-spark/3.1.2/libexec export PATH="$SPARK_HOME/bin/:$PATH" export PYSPARK_DRIVER_PYTHON="jupyter" export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
In terminal,
-
Update $PATH variable,
$ source ~/.bashrc
-
Ensure
graphframes-0.8.1-spark3.0-s_2.12.jar
presents in the/Users/<USER_NAME>/.ivy2/jars
folder:Start the
pyspark
withgraphframes
in terminal,==> Needs 2 flags,
--packages
and--jars
==> Ensure the
graphframes
package name is same asgraphframes-0.8.1-spark3.0-s_2.12.jar
in folder.=> Deal with error:
java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
=> https://blog.csdn.net/qq_42166929/article/details/105983616
$ pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 --jars graphframes-0.8.1-spark3.0-s_2.12.jar
Terminal output:
Jupyter Notebook:
[Still need to investigate]
https://github.com/cenzwong/tech/tree/master/Note/Spark#graphframe
Set MongoDB connection when create Spark session in Python code.
URL = "mongodb://{USER_NAME}:{PASSWORD}@127.0.0.1:{PORT}/test.application_test?readPreference=primaryPreferred?authMechanism={SCRAM-SHA-1}"
spark = (
SparkSession.builder.master("local[*]")
.appName("TASK_NAME")
.config("spark.mongodb.input.uri", URI)
.config("spark.mongodb.output.uri", URI)
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
.config("spark.jars.packages", "org.mongodb:mongo-java-driver:x.x.x")
.getOrCreate()
)
Terminal:
pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 --jars graphframes-0.8.1-spark3.0-s_2.12.jar \
--conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.application_test?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.application_test" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
Notebook:
from pymongo import MongoClient
def _saveDfToMongoDB(sparkDF, mongoCollection):
sparkDF.cache()
print(f"Storing {sparkDF.count()} {mongoCollection} to db")
start = datetime.now()
sparkDF.write.format("mongo") \
.mode("append") \
.option("database", "msbd5003") \
.option("collection", mongoCollection) \
.save()
end = datetime.now()
spent = (end - start).total_seconds()
print(f"Stored, time used: {spent} s")
df = spark.read.json("file path")
_saveDfToMongoDB(df, "mongodb collection name")
Inside Jupyter Notebook:
==> Reference: https://medium.com/@roshinijohri/spark-with-jupyter-notebook-on-macos-2-0-0-and-higher-c61b971b5007
Cell 1:
print(sc.version)
Output 1:
3.1.2
OR
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from graphframes import *
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc
Output:
SparkSession - hive
SparkContext
Spark UI
Version
v3.1.2
Master
local[*]
AppName
PySparkShell
Cell 2:
path_to_the_jar_file = 'graphframes-0.8.1-spark3.0-s_2.12.jar'
sc.addPyFile(path_to_the_jar_file)
Cell 3:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
v = sqlContext.createDataFrame([("a", ),("b", ),], ["id", ])
e = sqlContext.createDataFrame([("a", "b"),], ["src", "dst"])
from graphframes import *
g = GraphFrame(v, e)
g.inDegrees.show()
Output 3:
+---+--------+
| id|inDegree|
+---+--------+
| b| 1|
+---+--------+
Cell 4:
import pyspark
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("spark test").getOrCreate()
columns = ['id', 'dogs', 'cats']
vals = [
(1, 2, 0),
(2, 0, 1)
]
# create DataFrame
df = spark.createDataFrame(vals, columns)
df.show()
Output 4:
+---+----+----+
| id|dogs|cats|
+---+----+----+
| 1| 2| 0|
| 2| 0| 1|
+---+----+----+
-
Official pyspark example: https://github.com/spark-examples/pyspark-examples