# PySpark 函式與用法示例

## 1. SparkSession

In [None]:

from pyspark.sql import SparkSession

# 初始化 SparkSession，這是 PySpark 的入口點
spark = SparkSession.builder.appName("範例應用").getOrCreate()

# 獲取 SparkContext（舊版入口點）
sc = spark.sparkContext

# 停止 SparkSession
spark.stop()


## 2. DataFrame API

In [None]:

# 建立 DataFrame
data = [(1, "Alice", 29), (2, "Bob", 31)]  # 範例數據
columns = ["id", "name", "age"]            # 欄位名稱
df = spark.createDataFrame(data, columns)  # 建立 DataFrame

# 顯示數據
df.show()                 # 顯示前 20 行數據
df.printSchema()          # 顯示數據結構
df.columns                # 獲取所有列名
df.dtypes                 # 獲取列名與資料型別


In [None]:

# 過濾數據，篩選年齡大於 30 的資料
filtered_df = df.filter(df.age > 30)

# 選取特定欄位
selected_df = df.select("name", "age")

# 排序數據，根據年齡降序排序
sorted_df = df.orderBy("age", ascending=False)


In [None]:

from pyspark.sql.functions import avg, count

# 分組計算：計算每個年齡的數量
df.groupBy("age").count().show()

# 聚合函式：計算年齡的平均值
df.agg(avg("age").alias("平均年齡")).show()


## 3. SQL 操作

In [None]:

# 建立臨時表
df.createOrReplaceTempView("people")

# 執行 SQL 查詢
result = spark.sql("SELECT name, age FROM people WHERE age > 30")
result.show()


## 4. RDD（彈性分布式數據集）

In [None]:

# 創建 RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# 使用 Map 與 Filter 函式
mapped_rdd = rdd.map(lambda x: x * 2)  # 將每個元素乘以 2
filtered_rdd = rdd.filter(lambda x: x > 3)  # 過濾大於 3 的元素

# 收集結果並打印
collected = filtered_rdd.collect()
print(collected)  # [4, 5]


In [None]:

# 使用 Reduce 函式計算總和
sum_rdd = rdd.reduce(lambda x, y: x + y)

# 基本聚合操作
count = rdd.count()       # 計算元素個數
max_value = rdd.max()     # 找出最大值


## 5. DataFrame 函式

In [None]:

from pyspark.sql.functions import col, lit, when

# 新增欄位，將年齡乘以 2
df = df.withColumn("新欄位", df.age * 2)

# 條件運算，新增欄位判斷是否成年
df = df.withColumn("是否成年", when(col("age") >= 18, lit(True)).otherwise(lit(False)))

# 字符串處理，將名稱轉為大寫
from pyspark.sql.functions import upper, lower
df = df.withColumn("名稱大寫", upper(col("name")))


## 6. MLlib（機器學習庫）

In [None]:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# 創建特徵向量
assembler = VectorAssembler(inputCols=["age"], outputCol="features")
data = assembler.transform(df)

# 建立線性回歸模型並訓練
lr = LinearRegression(featuresCol="features", labelCol="age")
model = lr.fit(data)


## 7. Streaming（數據流處理）

In [None]:

from pyspark.streaming import StreamingContext

# 創建 StreamingContext，每秒處理一次數據
ssc = StreamingContext(sc, 1)

# 創建數據流
lines = ssc.socketTextStream("localhost", 9999)

# 數據處理：統計單詞出現次數
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
word_counts.pprint()

# 啟動數據流
ssc.start()
ssc.awaitTermination()


## 8. GraphFrames（圖計算）

In [None]:

from graphframes import GraphFrame

# 定義節點與邊
vertices = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
edges = spark.createDataFrame([(1, 2)], ["src", "dst"])

# 創建圖
g = GraphFrame(vertices, edges)

# 運行 PageRank 演算法
result = g.pageRank(resetProbability=0.15, maxIter=10)
result.vertices.show()
