# 1 读取数据

In [16]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

spark = SparkSession \
    .builder \
    .appName("task3") \
    .getOrCreate()

df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(r'E:\大三上课程\金融大数据\实验\Experiment4\train_data.csv')

# 2 计算employer_type分布
## 2.1 统计总数

In [17]:
total_num = df.count() # 总行数
print('总行数：', total_num)

总行数： 300000


## 2.2 新建一列值设为1，分组求和，除以总数即是占比

In [18]:
df = df.withColumn('employer_type_count', functions.lit(1))
result1 = df.groupby('employer_type').agg({'employer_type_count': 'sum'})
result1 = result1.withColumn('employer_type_prop', result1['sum(employer_type_count)']/total_num).select('employer_type','employer_type_prop')
result1.show()

+--------------+--------------------+
| employer_type|  employer_type_prop|
+--------------+--------------------+
|幼教与中小学校| 0.09998333333333333|
|      上市企业| 0.10012666666666667|
|      政府机构| 0.25815333333333335|
|    世界五百强|0.053706666666666666|
|  高等教育机构| 0.03368666666666666|
|      普通企业|  0.4543433333333333|
+--------------+--------------------+



## 2.3 输出

In [19]:
output1 = result1.collect()
with open(r"E:\大三上课程\金融大数据\实验\Experiment4\employerType.csv", "w") as f1:
    for (type_name, prop) in output1:
        f1.write("%s,%f\n" % (type_name, prop))
f1.close()

# 3 计算每个用户应还利息
## 3.1 计算

In [20]:
result2 = df.withColumn('total_money', df['year_of_loan']*df['monthly_payment']*12-df['total_loan']).select('user_id', 'total_money')
result2.show(5)

+-------+------------------+
|user_id|       total_money|
+-------+------------------+
|      0|            3846.0|
|      1|1840.6000000000004|
|      2|10465.600000000002|
|      3|1758.5200000000004|
|      4| 1056.880000000001|
+-------+------------------+
only showing top 5 rows



## 3.2 输出

In [21]:
output2 = result2.collect()
with open(r"E:\大三上课程\金融大数据\实验\Experiment4\userTotalMoney.csv", "w") as f2:
    for (user_id, money) in output2:
        f2.write("%s,%f\n" % (user_id, money))
f2.close()

# 4 筛选工作年限大于5的
## 4.1 定义函数对字符串进行转换

In [22]:
def cal_work_year(work_year):
    if work_year == None:
        return 0
    elif '<' in work_year:
        return 1
    else:
        year = work_year.split(' ')[0]
        year = year.split('+')[0]
        return int(year)


# 返回类型为字符串类型
udf_cal_work_year = udf(cal_work_year, IntegerType())
# 使用
result3 = df.withColumn('int_work_year', udf_cal_work_year(df.work_year))

result3 = result3.select(result3.user_id, result3.censor_status, result3.int_work_year).filter(
    result3.int_work_year > 5)

result3.show(5)

+-------+-------------+-------------+
|user_id|censor_status|int_work_year|
+-------+-------------+-------------+
|      1|            2|           10|
|      2|            1|           10|
|      5|            2|           10|
|      6|            0|            8|
|      7|            2|           10|
+-------+-------------+-------------+
only showing top 5 rows



## 4.2 输出

In [23]:
output3 = result3.collect()
with open(r"E:\大三上课程\金融大数据\实验\Experiment4\censorStatusCondition.csv", "w") as f3:
    for (user_id, censor_status, work_year) in output3:
        f3.write("%s,%s,%s\n" % (user_id, censor_status, work_year))
f3.close()
spark.stop()