## UDF

In [None]:
def get_track_id_column(row):
    row_dict = json.loads(row.value)
    track_id = row_dict.get('track_id')
    
    return Row(value=row.value,track_id=track_id)

In [None]:
def get_column_expanded(row, col_name='track_id'):
    row_dict = json.loads(row.value)
    col_value = row_dict.get(col_name)
    return Row(value=row.value, **{col_name: col_value})

In [1]:
# 太慢了
def get_some_columns_expanded(df, columns=['track_id']):
    for col_name in columns:
        df = df.rdd.map(lambda row: get_column_expanded(row, col_name)).toDF()
    return df

In [None]:
def get_columns_expanded(row, col_names=['track_id']):
    row_dict = json.loads(row.value)
    new_values = {}
    
    for col_name in col_names:
        col_value = row_dict.get(col_name)
        if col_value is not None:
            new_values[col_name] = col_value
    
    return Row(value=row.value, **new_values)

In [None]:
def get_columns_expanded_only(row, col_names=['track_id']):
    row_dict = json.loads(row.value)
    new_values = {}
    
    for col_name in col_names:
        col_value = row_dict.get(col_name)
        if col_value is not None:
            new_values[col_name] = col_value
    
    return Row(**new_values)

In [None]:
def get_columns_inValue_only(row, col_names=['track_id']):
    row_dict = json.loads(row.value)
    new_values = {}
    
    for col_name in col_names:
        col_value = row_dict.get(col_name)
        if col_value is not None:
            new_values[col_name] = col_value
    
    return Row(value=json_dumps(new_values))

In [None]:
def get_track_idAndContent(row):
    row_dict = json.loads(row.value)
    track_id = row_dict.get('id')
    content = row_dict.get('content')
    new_row_dict =  {'track_id': track_id, 'content': content}

    return Row(value=json_dumps(new_row_dict))

### 做差集

In [None]:
def subtract_by_id(df1, df2):
    df = df1.select('track_id').subtract(df2.select('track_id'))
    df_out = df.join(df1, on='track_id', how='inner')
    
    return df_out

### 前后裁断

In [None]:
def get_first_last_cut_content(row):
    row_dict = json.loads(row.value)
    content_length = row_dict.get('content_length')
    if content_length > 20000:
        content_1 = row_dict.get("content")[:10000]
        content_2 = row_dict.get("content")[content_length-10000:]
    else:
        content_1 = row_dict.get("content")
        content_2 = ''
    track_id = row_dict.get('track_id')
    new_row_dict =  {'track_id': track_id, 'content_1': content_1, 'content_2': content_2}

    return Row(value=json_dumps(new_row_dict))

### content_list2content

In [None]:
from app.common_clean.core.content import get_text_content

def get_content(rows):
    for row in rows:
        row_dict = json.loads(row.value)
        track_id = row_dict.get("track_id")
        content_list = row_dict.get("content_list")
        if content_list and track_id:
            content = get_text_content(content_list)
            res_dict = {"track_id": track_id, "content": content}
            yield Row(value=json_dumps(res_dict))

### MD5去重

In [None]:
import hashlib

def calculate_md5(content):
    md5_hash = hashlib.md5()
    md5_hash.update(content)  # content 是字节流
    return md5_hash.hexdigest()

from xinghe.s3 import read_s3_object_bytes

In [None]:
def map_partition_md5(rows):
    for row in rows:
        row_dict = json.loads(row.value)
        track_id, path = row_dict.get("track_id"), row_dict.get("path")
        try:
            buf = read_s3_object_bytes(path)
            md5 = calculate_md5(buf)
        except Exception as e:
            md5 = None
        yield Row(track_id=track_id, raw_file_md5=md5)

### 获取content

In [None]:
def get_content(rows):
    for row in rows:
        row_dict = json.loads(row.value)
        track_id = row_dict.get("track_id")
        
        try:
            content = get_text_content(row_dict.get("content_list"))
            if content is None:
                content = ""  
        except TypeError:
            content = ""  
        
        content_dict = {"track_id": track_id, "content": content}
        
        yield Row(value=json_dumps(content_dict))

In [None]:
def pipline_normalize_content(rows):
    for row in rows:
        row_dict = json.loads(row.value)
        track_id = row_dict.get("track_id")
        
        err_handler, res_data = run_handler(
            data_dict=row_dict, handler_list=_clean_rule_list, short_cut=False, verbose=False
        )
        
        try:
            normalized_content = get_text_content(res_data.get("content_list"))
            if normalized_content is None:
                normalized_content = ""  
        except TypeError:
            normalized_content = ""  
        
        content_dict = {"track_id": track_id, "content": normalized_content}
        
        if not err_handler:
            yield Row(value=json_dumps(content_dict))

## 算子

In [None]:
json.loads(book_raw_metadata.take(1)[0].value).get("remark").keys()

### 展开字段

In [None]:
from pyspark.sql.functions import to_json, struct

md5_df_Invalue = md5_df.select(to_json(struct(md5_df.columns)).alias("value"))

In [None]:
df.rdd.map(lambda x: json.loads(x.value)).toDF()

### 收缩字段

In [None]:
md5_df_value= md5_df.rdd.map(lambda x: Row(value=json_dumps(x.asDict()))).toDF() 

### 转为pandas dataframe

In [None]:
df_pd = book_metadata_anno_combined_expanded.toPandas()

### dataframe的filter

In [None]:
df_filtered = book_metadata_anno_expanded.filter(col("category") != "")

### rdd的filter（使用row）

In [None]:
book_raw_noAnno_path_nonan = book_raw_noAnno_path.rdd.filter(lambda row: json.loads(row.value).get("path") != None).toDF()

### 合并操作

In [None]:
textbook_m9_zh_id_anno = textbook_m9_zh_id.join(textbook_metadata_anno_id, on='track_id', how='inner')

### 统计null数量

In [None]:
missing_counts = book_raw_metadata_expanded.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in book_raw_metadata_expanded.columns])

### 改列名

In [None]:
df_write_expanded = df_write_expanded.withColumnRenamed("major", "major_1st_level")\
                                     .withColumnRenamed("course", "major_2nd_level")

### partition

In [None]:
df_nocontent.rdd.getNumPartitions()

In [None]:
df_nocontent = df_nocontent.repartition(5)

### 随机采样

In [None]:
df_download_limit = df_download.sample(fraction=0.1, seed=42)

### 丢弃null

In [None]:
non_null_name_df = df.dropna(subset=["name"])

### 求一列之和

In [None]:
df_featuresAndExample_zh_bytes = df_featuresAndExample_zh_expanded.withColumn("bytes_len", col("bytes_len").cast("int"))

In [None]:
df_featuresAndExample_zh_bytes.select(sum("bytes_len")).collect()[0][0]

### 管线清洗

In [None]:
from app.common_clean.feature_filter import FeatureCleaner

def map_clean(rows):
    feature_cleaner = FeatureCleaner()
    for row in rows:
        row_dict = json.loads(row.value)
        err, res_dict = feature_cleaner.clean_data(row_dict)
        if not err:
            yield Row(value=json_dumps(res_dict))

### 丢弃某列

In [None]:
df_dropped = df.drop("ID")

## 去重

In [None]:
distinct_df = df.dropDuplicates(["raw_file_md5"])

### sql方法

In [None]:
# 将 DataFrame 注册为临时视图
df.createOrReplaceTempView("files")

# 使用 Spark SQL 删除所有重复的数据
unique_df_sql = spark.sql("""
SELECT raw_file_md5, timestamp
FROM (
    SELECT raw_file_md5, timestamp,
           COUNT(*) OVER (PARTITION BY raw_file_md5) as cnt
    FROM files
) t
WHERE cnt = 1
""")

# 显示结果
unique_df_sql.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 创建 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 假设我们有一个包含 raw_file_md5 和 timestamp 列的 DataFrame
data = [
    ("file1", "2023-01-01"),
    ("file2", "2023-01-02"),
    ("file1", "2023-01-03"),
    ("file3", "2023-01-04"),
    ("file2", "2023-01-05")
]

columns = ["raw_file_md5", "timestamp"]

df = spark.createDataFrame(data, columns)

# 使用 groupBy 和 count 来计算每行的出现次数
grouped_df = df.groupBy(*columns).count()

# 过滤掉那些计数大于 1 的行
filtered_df = grouped_df.filter(col("count") == 1)

# 选择原始列以返回与原始 DataFrame 结构相同的结果
result_df = filtered_df.select(columns)

# 显示结果
result_df.show()

In [None]:
# 将 DataFrame 注册为临时视图
df.createOrReplaceTempView("files")

# 使用 Spark SQL 进行去重
distinct_df_sql = spark.sql("""
SELECT raw_file_md5, timestamp
FROM (
    SELECT raw_file_md5, timestamp,
           ROW_NUMBER() OVER (PARTITION BY raw_file_md5 ORDER BY timestamp) as rn
    FROM files
) t
WHERE rn = 1
""")

# 显示结果
distinct_df_sql.show()

## 创建value，再展开

### 方法一

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, from_json, isnull
from pyspark.sql.types import StructType, StructField, StringType

# 初始化 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 创建示例 DataFrame
data = [
    '{"name": "Alice", "age": 30, "city": "New York"}',
    '{"ERROR_RESULT": null}',
    '{"name": "Bob", "age": 25, "city": "Los Angeles"}'
]

# 直接创建 DataFrame
df_write = spark.createDataFrame(data, StringType()).toDF("value")

In [None]:
# 定义 JSON schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("city", StringType(), True),
    StructField("ERROR_RESULT", StringType(), True)
])

# 解析 JSON 字符串并转换为 DataFrame
parsed_df = df_write.withColumn("parsed_json", from_json(col("value"), schema))

In [None]:
# 展平 parsed_value 结构
df_final = df_parsed.select(
    col("parsed_value.name"),
    col("parsed_value.age"),
    col("parsed_value.city")
)

### 方法二

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct

# 初始化 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 创建示例多列 DataFrame
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": None, "age": None, "city": None},
    {"name": "Bob", "age": 25, "city": "Los Angeles"}
]

df = spark.createDataFrame(data)

# 将所有列转换为 JSON 字符串
df_json = df.select(to_json(struct(df.columns)).alias("value"))

# 显示结果 DataFrame
df_json.show(truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/24 09:50:37 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/01/24 09:51:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
                                                                                

+--------------------------------------------+
|value                                       |
+--------------------------------------------+
|{"age":30,"city":"New York","name":"Alice"} |
|{}                                          |
|{"age":25,"city":"Los Angeles","name":"Bob"}|
+--------------------------------------------+



In [24]:
df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+----+-----------+-----+
| age|       city| name|
+----+-----------+-----+
|  30|   New York|Alice|
|null|       null| null|
|  25|Los Angeles|  Bob|
+----+-----------+-----+



                                                                                

### 方法三

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType

In [None]:
# 初始化 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 创建示例数据
data = [
    '{"name": "Alice", "age": 30, "city": "New York"}',
    '{"ERROR_RESULT": null}',
    '{"name": "Bob", "age": 25, "city": "Los Angeles"}'
]

# 将数据转换为 Row 对象
rows = [Row(value=json_str) for json_str in data]

# 定义 Schema
schema = StructType([
    StructField("value", StringType(), True)
])

# 使用 Schema 创建 DataFrame
df_write = spark.createDataFrame(rows, schema)

# 显示 DataFrame
df_write.show(truncate=False)

In [None]:
from pyspark.sql import SparkSession

# 初始化 SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# 创建示例 DataFrame
data = [
    '{"name": "Alice", "age": 30, "city": "New York"}',
    '{"ERROR_RESULT": null}',
    '{"name": "Bob", "age": 25, "city": "Los Angeles"}'
]

In [None]:
# 直接创建 DataFrame
df_write = spark.createDataFrame(data, StringType()).toDF("value")
# 显示 DataFrame
df_write.show(truncate=False)

In [None]:
# 显示 DataFrame
df_write.show(truncate=False)

## groupby

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, max, min, mean, stddev, var_pop, first, last, collect_list, collect_set

In [None]:
spark = SparkSession.builder \
    .appName("PySpark GroupBy and Agg Example") \
    .getOrCreate()
data = [
    ("Product A", "Category 1", 100.0, 5),
    ("Product B", "Category 2", 150.0, 3),
    ("Product C", "Category 1", 200.0, 8),
    ("Product D", "Category 2", 250.0, 6),
    ("Product E", "Category 3", 300.0, 4)
]

columns = ["product", "category", "sales", "quantity"]

df = spark.createDataFrame(data, columns)
df.show()

In [None]:
from pyspark.sql.functions import sum, avg

grouped_df = df.groupBy("category").agg(
    sum("sales").alias("total_sales"),
    avg("sales").alias("avg_sales")
)

grouped_df.show()

In [None]:
grouped_df_multiple = df.groupBy("category", "product").agg(
    sum("sales").alias("total_sales"),
    avg("sales").alias("avg_sales")
)

grouped_df_multiple.show()

In [None]:
grouped_df_advanced = df.groupBy("category").agg(
    sum("sales").alias("total_sales"),
    avg("sales").alias("avg_sales"),
    max("sales").alias("max_sales"),
    min("sales").alias("min_sales"),
    count("product").alias("num_products")
)

grouped_df_advanced.show()

In [None]:
from pyspark.sql.functions import expr

tax_rate = 0.1
grouped_df_with_expr = df.groupBy("category").agg(
    (sum("sales") * (1 + tax_rate)).alias("total_sales_incl_tax")
)

grouped_df_with_expr.show()

## to_json

In [3]:
df_json.printSchema()

root
 |-- value: string (nullable = true)



In [4]:
struct(df.columns)

Column<'struct(age, city, name)'>

In [6]:
to_json(struct(df.columns))

Column<'to_json(struct(age, city, name))'>

In [8]:
data = [
    {"name": "Alice", "age": 30, "city": "New York"},
    {"name": None, "age": None, "city": None},
    {"name": "Bob", "age": 25, "city": "Los Angeles"}
]

df = spark.createDataFrame(data)

# 将所有列转换为 JSON 字符串
df_json = df.select(to_json(struct(df.columns), {"pretty": "true"}).alias("value"))

In [9]:
df_json.show(truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------+
|value                                                           |
+----------------------------------------------------------------+
|{\n  "age" : 30,\n  "city" : "New York",\n  "name" : "Alice"\n} |
|{ }                                                             |
|{\n  "age" : 25,\n  "city" : "Los Angeles",\n  "name" : "Bob"\n}|
+----------------------------------------------------------------+



                                                                                

+-------------------------------------------------+
|value                                            |
+-------------------------------------------------+
|{"name": "Alice", "age": 30, "city": "New York"} |
|{"ERROR_RESULT": null}                           |
|{"name": "Bob", "age": 25, "city": "Los Angeles"}|
+-------------------------------------------------+



[Stage 4:>                                                          (0 + 1) / 1]

+-------------------------------------------------+
|value                                            |
+-------------------------------------------------+
|{"name": "Alice", "age": 30, "city": "New York"} |
|{"ERROR_RESULT": null}                           |
|{"name": "Bob", "age": 25, "city": "Los Angeles"}|
+-------------------------------------------------+



                                                                                

In [18]:
1748593+1692+6368296

8118581

In [20]:
1748593+1692

1750285

In [None]:
1775350

In [None]:
1751349

In [19]:
8108880-6368304

1740576

In [None]:
6368304

In [None]:
17627406

In [22]:
None == ''

False

In [23]:
None != ''

True