# Install dependencies

In [2]:
#!nvidia-smi

In [7]:
#!pip3 install --upgrade pip

In [9]:
# 安装 pytorch (gpu)
#!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [10]:
# 安装依赖包
#!pip3 install numpy pandas seaborn matplotlib scipy scikit-learn tensorflow

# 初始化 PySpark

In [None]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, greatest, lit, abs
from configparser import ConfigParser
from pyspark.sql.functions import sum as spark_sum

# 设置 JDK
os.environ["JAVA_HOME"] = "/lib/jvm/java-17-openjdk-amd64"

print(pyspark.__version__)

# 添加 jar 包
jars_dir = "/home/jovyan/jars"
jars_list = [
    os.path.join(jars_dir, f) for f in os.listdir(jars_dir) if f.endswith(".jar")
]
jars_str = ",".join(jars_list)
print(jars_str)

# 创建本地 SparkSession（local 模式）
spark = (
    SparkSession.builder.appName("LocalPySparkExample")
    .config("spark.jars", jars_str)
    .master("local[*]")
    .config("spark.driver.memory", "10g")
    .config("spark.driver.maxResultSize", "4g")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.default.parallelism", "100")
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
    .getOrCreate()
)

print('Spark Version: ', spark.version)

# Jar 包链接

In [3]:
# Aliyun OSS
#!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.18.2/aliyun-sdk-oss-3.18.2.jar
#!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.4.1/hadoop-aliyun-3.4.1.jar
# Paimon
#!wget -P /home/jovyan/jars https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-spark-3.4/1.3-SNAPSHOT/paimon-spark-3.4-1.3-20250707.003407-18.jar
#!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/org/apache/paimon/paimon-oss/1.2.0/paimon-oss-1.2.0.jar
# AWS (包含 s3a 支持)
!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.4.1/hadoop-aws-3.4.1.jar
!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.32.24/bundle-2.32.24.jar
# 其他
#!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.12.0/commons-configuration2-2.12.0.jar
#!wget -P /home/jovyan/jars https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar

--2025-08-19 03:16:36--  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.4.1/hadoop-aws-3.4.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.196.209, 199.232.192.209
Connecting to repo1.maven.org (repo1.maven.org)|199.232.196.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 865554 (845K) [application/java-archive]
Saving to: ‘/home/jovyan/jars/hadoop-aws-3.4.1.jar’


2025-08-19 03:16:37 (1.06 MB/s) - ‘/home/jovyan/jars/hadoop-aws-3.4.1.jar’ saved [865554/865554]

--2025-08-19 03:16:38--  https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.32.24/bundle-2.32.24.jar
Resolving repo1.maven.org (repo1.maven.org)... 146.75.92.209
Connecting to repo1.maven.org (repo1.maven.org)|146.75.92.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 684297060 (653M) [application/java-archive]
Saving to: ‘/home/jovyan/jars/bundle-2.32.24.jar’


2025-08-19 03:23:49 (1.52 MB/s) - ‘/home/jovyan/jars/bundle-2.32.24.jar

# 访问 OSS

In [None]:
# 读取 OSS 配置
config_path = os.path.expanduser("~/.ossutilconfig")
parser = ConfigParser()
parser.read(config_path)
endpoint = parser.get("Credentials", "endpoint")
access_key_id = parser.get("Credentials", "accessKeyID")
access_key_secret = parser.get("Credentials", "accessKeySecret")

# 创建本地 SparkSession（local 模式）
spark = (
    SparkSession.builder.appName("LocalPySparkExample")
    .config("spark.jars", jars_str)
    .master("local[*]")
    .config("spark.driver.memory", "10g")
    .config("spark.driver.maxResultSize", "4g")
    .config("spark.sql.shuffle.partitions", "100")
    .config("spark.default.parallelism", "100")
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35")
    .config(
        "spark.hadoop.fs.oss.impl",
        "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem",
    )
    .config("spark.hadoop.fs.oss.accessKeyId", access_key_id)
    .config("spark.hadoop.fs.oss.accessKeySecret", access_key_secret)
    .config("spark.hadoop.fs.oss.endpoint", endpoint)
    .getOrCreate()
)

# 访问 MinIO

In [None]:
from pyspark.sql import SparkSession

# 读取 MinIO 配置
config_path = os.path.expanduser("~/.minioconfig")
parser = ConfigParser()
parser.read(config_path)
endpoint = parser.get("Credentials", "endpoint")
access_key_id = parser.get("Credentials", "accessKeyID")
access_key_secret = parser.get("Credentials", "accessKeySecret")

spark = (
    SparkSession.builder
    .appName("PySpark MinIO Example")
    # S3A 配置
    .config("spark.hadoop.fs.s3a.endpoint", endpoint)                   # MinIO 服务地址
    .config("spark.hadoop.fs.s3a.access.key", access_key_id)            # MinIO 用户名
    .config("spark.hadoop.fs.s3a.secret.key", access_key_secret)        # MinIO 密码
    .config("spark.hadoop.fs.s3a.path.style.access", "true")            # 必须设为 true，MinIO 用路径模式
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)

# 读取数据
df = spark.read.csv("s3a://mybucket/data.csv", header=True, inferSchema=True)
df.show()