In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def spark_rdd_tasks(spark):
    # 读取user_balance_table数据为RDD并跳过标题行
    user_balance_rdd = spark.sparkContext.textFile("/FileStore/tables/user_balance_table.csv") \
     .filter(lambda line: not line.startswith("user_id")) \
     .map(lambda line: line.split(",")) \
     .map(lambda row: (row[1], (int(row[5]), int(row[8]))))  # (日期, (资金流入, 资金流出))

    # 查询所有天的资金流入和流出情况
    all_dates_flow = user_balance_rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
     .map(lambda x: (x[0], x[1][0], x[1][1]))

    schema = StructType([
        StructField("date", StringType(), True),
        StructField("inflow", IntegerType(), True),
        StructField("outflow", IntegerType(), True)
    ])
    df = spark.createDataFrame(all_dates_flow, schema)
    df.write.mode("overwrite").saveAsTable("fund_flow")
    df.show()
    display(df)


if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    spark_rdd_tasks(spark)

+--------+---------+---------+
|    date|   inflow|  outflow|
+--------+---------+---------+
|20140808|231616935|311648757|
|20140820|306133606|202452782|
|20140823|139192764|199377531|
|20140829|265335172|273756380|
|20140815|242336052|236516007|
|20140806|286598298|282346594|
|20140728|369535407|345986909|
|20140707|269914951|317612569|
|20140328|223056705|405443946|
|20140322|188663159|138039412|
|20140831|272878511|292943033|
|20140825|307361141|312413411|
|20140726|126012255|282653341|
|20140720|174193562|174462836|
|20140717|250755039|298279385|
|20140711|206409952|240050748|
|20140708|221992553|340453063|
|20140705|167078182|272535138|
|20140702|382237475|328950951|
|20140321|279298412|259655286|
+--------+---------+---------+
only showing top 20 rows



date,inflow,outflow
20140808,231616935,311648757
20140820,306133606,202452782
20140823,139192764,199377531
20140829,265335172,273756380
20140815,242336052,236516007
20140806,286598298,282346594
20140728,369535407,345986909
20140707,269914951,317612569
20140328,223056705,405443946
20140322,188663159,138039412


In [0]:
from pyspark.sql import SparkSession


def spark_rdd_tasks(spark):
    # 读取user_balance_table数据为RDD并跳过标题行，假设数据结构中包含user_id和日期字段
    user_balance_rdd = spark.sparkContext.textFile("/FileStore/tables/user_balance_table.csv") \
     .filter(lambda line: not line.startswith("user_id")) \
     .map(lambda line: line.split(",")) \
     .map(lambda row: (row[0], row[1]))  # (user_id, date)

    # 过滤出2014年8月的数据
    august_2014_rdd = user_balance_rdd.filter(lambda x: x[1].startswith("201408"))

    # 按用户ID分组，统计每个用户在8月的记录天数
    user_days_count_rdd = august_2014_rdd.map(lambda x: (x[0], 1)) \
     .reduceByKey(lambda a, b: a + b)

    # 筛选出记录天数至少为5天的活跃用户
    active_users_rdd = user_days_count_rdd.filter(lambda x: x[1] >= 5)

    # 统计活跃用户总数
    active_user_count = active_users_rdd.count()
    print("2014年8月的活跃用户总数: ", active_user_count)

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    spark_rdd_tasks(spark)

2014年8月的活跃用户总数:  12767


In [0]:
from pyspark.sql import SparkSession

def read_and_display_daily_flow_table(spark):
    # 表名
    table_name = "daily_flow_table"

    # 读取表格数据
    daily_flow_table_df = spark.read.table(table_name)

    # 使用display展示表格数据
    display(daily_flow_table_df)

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    read_and_display_daily_flow_table(spark)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
from pyspark.sql import SparkSession

def calculate_city_avg_balance(spark):
    # 读取user_profile_table和user_balance_table数据为DataFrame，假设文件有标题行
    user_profile_df = spark.read.csv("/FileStore/tables/user_profile_table.csv", header=True)
    user_balance_df = spark.read.csv("/FileStore/tables/user_balance_table.csv", header=True)

    # 创建临时视图
    user_profile_df.createOrReplaceTempView("user_profile")
    user_balance_df.createOrReplaceTempView("user_balance")

    # 使用Spark SQL查询计算每个城市在2014年3月1日的平均余额并按降序排列
    query = """
        SELECT up.City, AVG(ub.tBalance) AS avg_balance
        FROM user_profile up
        JOIN user_balance ub ON up.user_id = ub.user_id
        WHERE ub.report_date = '20140301'
        GROUP BY up.City
        ORDER BY avg_balance DESC
    """
    result = spark.sql(query)

    # 显示结果
    result.show()

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    calculate_city_avg_balance(spark)

+-------+------------------+
|   City|       avg_balance|
+-------+------------------+
|6281949| 2795923.837298216|
|6301949|2650775.0664451825|
|6081949|2643912.7566638007|
|6481949|2087617.2136986302|
|6411949|1929838.5617977527|
|6412149| 1896363.471625767|
|6581949|1526555.5551020408|
+-------+------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

def top_3_users_by_city(spark):
    # 读取user_profile_table和user_balance_table数据为DataFrame，假设文件有标题行
    user_profile_df = spark.read.csv("/FileStore/tables/user_profile_table.csv", header=True)
    user_balance_df = spark.read.csv("/FileStore/tables/user_balance_table.csv", header=True)

    # 创建临时视图
    user_profile_df.createOrReplaceTempView("user_profile")
    user_balance_df.createOrReplaceTempView("user_balance")

    # 使用Spark SQL计算每个用户在2014年8月整月的总流量，先按用户和城市分组求和
    query = """
        SELECT up.City, ub.user_id, SUM(ub.total_purchase_amt + ub.total_redeem_amt) AS total_flow
        FROM user_profile up
        JOIN user_balance ub ON up.user_id = ub.user_id
        WHERE ub.report_date LIKE '201408%'
        GROUP BY up.City, ub.user_id
    """
    total_flow_df = spark.sql(query)

    # 为总流量列创建临时视图
    total_flow_df.createOrReplaceTempView("total_flow")

    # 使用窗口函数按城市对总流量进行排名
    query2 = """
        SELECT City, user_id, total_flow,
               RANK() OVER (PARTITION BY City ORDER BY total_flow DESC) AS rank
        FROM total_flow
    """
    ranked_df = spark.sql(query2)

    # 选择每个城市排名前三的用户
    top_3_users = ranked_df.filter(ranked_df["rank"] <= 3)

    # 显示结果
    top_3_users.show()

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    top_3_users_by_city(spark)

+-------+-------+------------+----+
|   City|user_id|  total_flow|rank|
+-------+-------+------------+----+
|6081949|  27235| 1.0847568E8|   1|
|6081949|  27746| 7.6065458E7|   2|
|6081949|  18945| 5.5304049E7|   3|
|6281949|  15118|1.49311909E8|   1|
|6281949|  11397|1.24293438E8|   2|
|6281949|  25814|1.04428054E8|   3|
|6301949|   2429|1.09171121E8|   1|
|6301949|  26825|  9.537403E7|   2|
|6301949|  10932| 7.4016744E7|   3|
|6411949|    662| 7.5162566E7|   1|
|6411949|  21030| 4.9933641E7|   2|
|6411949|  16769| 4.9383506E7|   3|
|6412149|  22585|2.00516731E8|   1|
|6412149|  14472| 1.3826279E8|   2|
|6412149|  25147| 7.0594902E7|   3|
|6481949|  12026| 5.1161825E7|   1|
|6481949|    670| 4.9626204E7|   2|
|6481949|  14877| 3.4488733E7|   3|
|6581949|   9494| 3.8854436E7|   1|
|6581949|  26876| 2.3449539E7|   2|
+-------+-------+------------+----+
only showing top 20 rows



+-------+-------+------------+----+
|   City|user_id|  total_flow|rank|
+-------+-------+------------+----+
|6081949|  27235| 1.0847568E8|   1|
|6081949|  27746| 7.6065458E7|   2|
|6081949|  18945| 5.5304049E7|   3|
|6281949|  15118|1.49311909E8|   1|
|6281949|  11397|1.24293438E8|   2|
|6281949|  25814|1.04428054E8|   3|
|6301949|   2429|1.09171121E8|   1|
|6301949|  26825|  9.537403E7|   2|
|6301949|  10932| 7.4016744E7|   3|
|6411949|    662| 7.5162566E7|   1|
|6411949|  21030| 4.9933641E7|   2|
|6411949|  16769| 4.9383506E7|   3|
|6412149|  22585|2.00516731E8|   1|
|6412149|  14472| 1.3826279E8|   2|
|6412149|  25147| 7.0594902E7|   3|
|6481949|  12026| 5.1161825E7|   1|
|6481949|    670| 4.9626204E7|   2|
|6481949|  14877| 3.4488733E7|   3|
|6581949|   9494| 3.8854436E7|   1|
|6581949|  26876| 2.3449539E7|   2|
+-------+-------+------------+----+
only showing top 20 rows


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

def calculate_and_store_daily_flows(spark):
    # 读取user_balance_table数据为DataFrame，假设文件有标题行
    user_balance_df = spark.read.csv("/FileStore/tables/user_balance_table.csv", header=True)

    # 创建临时视图，方便后续使用Spark SQL进行操作
    user_balance_df.createOrReplaceTempView("user_balance_view")

    # 先判断并删除已存在的daily_flow_temp_view临时视图
    spark.sql("DROP VIEW IF EXISTS daily_flow_temp_view")

    # 使用Spark SQL语句计算每天的总资金流入和流出量，并创建临时视图存储结果
    spark.sql("""
        CREATE TEMPORARY VIEW daily_flow_temp_view AS
        SELECT
            report_date,
            SUM(total_purchase_amt) AS total_inflow,
            SUM(total_redeem_amt) AS total_outflow
        FROM user_balance_view
        GROUP BY report_date
    """)

    # 对临时视图中的数据按照日期从小到大排序
    sorted_temp_view_df = spark.sql("SELECT * FROM daily_flow_temp_view ORDER BY report_date ASC")

    # 先判断prepare_data表是否存在，如果存在则删除
    spark.sql("DROP TABLE IF EXISTS prepare_data")

    # 使用Window函数结合row_number为每行数据添加行号（从1开始计数）
    window_spec = Window.orderBy(col("report_date"))
    sorted_temp_view_df = sorted_temp_view_df.withColumn("row_num", row_number().over(window_spec))

    # 将添加行号后的数据插入到prepare_data表中（采用overwrite模式，如果表已存在则覆盖）
    sorted_temp_view_df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("prepare_data")
    # 从prepare_data表中读取数据并显示
    prepare_data_df = spark.sql("SELECT * FROM prepare_data")
    prepare_data_df.show()

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    calculate_and_store_daily_flows(spark)

+-----------+------------+-------------+-------+
|report_date|total_inflow|total_outflow|row_num|
+-----------+------------+-------------+-------+
|   20130701| 3.2488348E7|    5525022.0|      1|
|   20130702|  2.903739E7|    2554548.0|      2|
|   20130703|  2.727077E7|    5953867.0|      3|
|   20130704| 1.8321185E7|    6410729.0|      4|
|   20130705| 1.1648749E7|    2763587.0|      5|
|   20130706| 3.6751272E7|    1616635.0|      6|
|   20130707|   8962232.0|    3982735.0|      7|
|   20130708| 5.7258266E7|    8347729.0|      8|
|   20130709| 2.6798941E7|    3473059.0|      9|
|   20130710| 3.0696506E7|    2597169.0|     10|
|   20130711| 4.4075197E7|    3508800.0|     11|
|   20130712| 3.4183904E7|    8492573.0|     12|
|   20130713| 1.5164717E7|    3482829.0|     13|
|   20130714| 2.2615303E7|    2784107.0|     14|
|   20130715| 4.8128555E7|  1.3107943E7|     15|
|   20130716| 5.0622847E7|  1.1864981E7|     16|
|   20130717| 2.9015682E7|  1.0911513E7|     17|
|   20130718| 2.4234

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

def prepare_data(spark):
    """
    从prepare_data表读取数据，添加行号作为特征相关的一部分，进行数据清洗以及特征工程，返回处理好的数据DataFrame
    """
    # 从prepare_data表读取数据
    data_df = spark.sql("SELECT * FROM prepare_data")

    # 使用Window函数结合row_number为每行数据添加行号（从1开始计数）
    window_spec = Window.orderBy(lit('A'))
    data_df = data_df.withColumn("row_num", row_number().over(window_spec))

    # 定义特征列（这里将行号作为特征，可根据实际情况调整，添加更多特征）
    feature_cols = ["row_num"]

    # 组装特征向量（针对申购总额预测）
    assembler_inflow = VectorAssembler(inputCols=feature_cols, outputCol="features_inflow")
    assembled_inflow_df = assembler_inflow.transform(data_df)

    # 组装特征向量（针对赎回总额预测）
    assembler_outflow = VectorAssembler(inputCols=feature_cols, outputCol="features_outflow")
    assembled_outflow_df = assembler_outflow.transform(data_df)

    return assembled_inflow_df.select("row_num", "features_inflow", "total_inflow"), assembled_outflow_df.select("row_num", "features_outflow", "total_outflow")

def train_model(inflow_df, outflow_df):
    """
    划分训练集和测试集（这里全部现有数据作为训练集），训练线性回归模型，分别返回申购和赎回总额预测的训练好的模型
    """
    # 划分训练集（这里将所有数据都作为训练集，因为要预测后续位置的数据，可根据实际调整）
    train_inflow_df = inflow_df
    train_outflow_df = outflow_df

    # 创建线性回归模型（申购总额预测）
    lr_inflow = LinearRegression(featuresCol="features_inflow", labelCol="total_inflow")
    # 训练模型（申购总额预测）
    model_inflow = lr_inflow.fit(train_inflow_df)

    # 创建线性回归模型（赎回总额预测）
    lr_outflow = LinearRegression(featuresCol="features_outflow", labelCol="total_outflow")
    # 训练模型（赎回总额预测）
    model_outflow = lr_outflow.fit(train_outflow_df)

    return model_inflow, model_outflow

def predict(model_inflow, model_outflow, num_predictions):
    """
    创建用于预测的数据（基于行号往后延续），使用训练好的模型进行预测，整理并返回预测结果DataFrame
    """
    # 获取已有数据的最大行号
    spark = SparkSession.builder.getOrCreate()
    max_row_num = spark.sql("SELECT MAX(row_num) AS max_num FROM prepare_data").collect()[0]["max_num"]

    # 创建包含预测行号的数据DataFrame
    row_num_list = [(i,) for i in range(max_row_num + 1, max_row_num + 1 + num_predictions)]
    row_num_rdd = spark.sparkContext.parallelize(row_num_list)
    row_num_df = row_num_rdd.toDF(["row_num"])

    # 组装特征向量（针对申购总额预测）
    assembler_inflow = VectorAssembler(inputCols=["row_num"], outputCol="features_inflow")
    assembled_inflow_df = assembler_inflow.transform(row_num_df)

    # 组装特征向量（针对赎回总额预测）
    assembler_outflow = VectorAssembler(inputCols=["row_num"], outputCol="features_outflow")
    assembled_outflow_df = assembler_outflow.transform(row_num_df)

    # 进行申购总额预测
    predictions_inflow = model_inflow.transform(assembled_inflow_df)
    result_inflow_df = predictions_inflow.select(col("row_num"), col("prediction").alias("predicted_inflow"))

    # 进行赎回总额预测
    predictions_outflow = model_outflow.transform(assembled_outflow_df)
    result_outflow_df = predictions_outflow.select(col("row_num"), col("prediction").alias("predicted_outflow"))

    # 合并预测结果
    result_df = result_inflow_df.join(result_outflow_df, on="row_num", how="outer")

    return result_df

def main():
    spark = SparkSession.builder.getOrCreate()

    # 准备数据
    inflow_df, outflow_df = prepare_data(spark)

    # 训练模型
    model_inflow, model_outflow = train_model(inflow_df, outflow_df)

    # 假设要预测的数据个数（对应原需求里预测2014年9月每天的数据，这里可按天数等情况确定个数）
    num_predictions = 30

    # 进行预测
    result_df = predict(model_inflow, model_outflow, num_predictions)
    display(result_df)

if __name__ == "__main__":
    main()

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

row_num,predicted_inflow,predicted_outflow
428,361776970.4623478,345236643.5413133
429,362454241.6100138,346054102.3303232
430,363131512.7576798,346871561.11933297
431,363808783.9053458,347689019.90834284
432,364486055.0530118,348506478.69735265
433,365163326.20067775,349323937.4863625
434,365840597.3483437,350141396.2753723
435,366517868.4960097,350958855.0643822
436,367195139.6436757,351776313.853392
437,367872410.7913417,352593772.6424018


In [0]:
from pyspark.sql import SparkSession

def read_and_display_daily_flow_table(spark):
    # 表名
    table_name = "prediction_results_201409"

    # 读取表格数据
    daily_flow_table_df = spark.read.table(table_name)

    # 使用display展示表格数据
    display(daily_flow_table_df)

if __name__ == "__main__":
    # 获取已存在的SparkSession
    spark = SparkSession.builder.getOrCreate()
    read_and_display_daily_flow_table(spark)

report_date,redemption_prediction
