<a href="https://colab.research.google.com/github/swguo/pySpark/blob/main/pySpark_%E5%AE%89%E8%A3%9D%E8%88%87%E5%9F%BA%E6%9C%AC%E6%93%8D%E4%BD%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libxtst6
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 49 not upgraded.
Need to get 30.8 MB of archives.
After this operation, 104 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxtst6 amd64 2:1.2.3-1build4 [13.4 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jre-headless amd64 8u432-ga~us1-0ubuntu2~22.04 [30.8 MB]
Fetched 30.8 MB in 1s (33.4 MB/s)
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 123632 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3

In [2]:
spark

## 2. 建立 RDD

In [3]:
rdd = sc.parallelize(["Hello Spark"])
counts = rdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print(counts)

[('Hello', 1), ('Spark', 1)]


RDD (Resilient Distributed Dataset) 是 PySpark 的基本資料結構，它是一個不可變的、可分割的、分散式的資料集合。您可以使用 parallelize() 方法從 Python 列表建立 RDD

In [5]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

ParallelCollectionRDD[7] at readRDDFromFile at PythonRDD.scala:289


In [11]:
data = rdd.collect()
print(data)

[1, 2, 3, 4, 5]


## 3. 執行轉換和動作

PySpark 提供了許多轉換和動作來操作 RDD。轉換會建立一個新的 RDD，而動作會觸發計算並返回結果。以下是一些常見的轉換和動作：

* map(): 將函數應用於 RDD 中的每個元素。
* filter(): 根據條件過濾 RDD 中的元素。
* reduce(): 將函數應用於 RDD 中的元素，並將結果聚合為單一值。
* collect(): 將 RDD 中的所有元素收集到驅動程式節點。

In [16]:
squared_rdd = rdd.map(lambda x: x * x)
print(squared_rdd.collect())
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect())
sum_rdd = rdd.reduce(lambda x, y: x + y)
print(sum_rdd)

[1, 4, 9, 16, 25]
[2, 4]
15


## 4. 建立 DataFrame

DataFrame 是一種以表格形式組織的資料結構，它類似於 Pandas DataFrame。您可以使用 createDataFrame() 方法從 RDD 或其他資料源建立 DataFrame：

In [7]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

In [9]:
df.show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



## 5. 執行 SQL 查詢

In [17]:
df.createOrReplaceTempView("people")
result = spark.sql("SELECT * FROM people WHERE age > 30")

In [18]:
result.show()

+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
+-------+---+



## 6. 儲存資料

In [19]:
df.write.format("csv").save("people.csv")