In [1]:
from pyspark import SparkConf, SparkContext   #导包
conf = SparkConf().setMaster("local").setAppName("rdd")  
sc = SparkContext(conf = conf)

In [3]:
#从hdfs中读取数据集并进行数据清洗
lines = sc.textFile("hdfs://192.168.43.191:9000/adult2.txt")
lines1 = lines.filter(lambda line:(len(line.split(","))==15))
lines1.take(10)

['25, Private,226802, 11th,7, Never-married, Machine-op-inspct, Own-child, Black, Male,0,0,40, United-States, <=50K',
 '38, Private,89814, HS-grad,9, Married-civ-spouse, Farming-fishing, Husband, White, Male,0,0,50, United-States, <=50K',
 '28, Local-gov,336951, Assoc-acdm,12, Married-civ-spouse, Protective-serv, Husband, White, Male,0,0,40, United-States, >50K',
 '44, Private,160323, Some-college,10, Married-civ-spouse, Machine-op-inspct, Husband, Black, Male,7688,0,40, United-States, >50K',
 '34, Private,198693, 10th,6, Never-married, Other-service, Not-in-family, White, Male,0,0,30, United-States, <=50K',
 '63, Self-emp-not-inc,104626, Prof-school,15, Married-civ-spouse, Prof-specialty, Husband, White, Male,3103,0,32, United-States, >50K',
 '24, Private,369667, Some-college,10, Never-married, Other-service, Unmarried, White, Female,0,0,40, United-States, <=50K',
 '55, Private,104996, 7th-8th,4, Married-civ-spouse, Craft-repair, Husband, White, Male,0,0,10, United-States, <=50K',
 '6

In [2]:
lines = sc.textFile("adult2.txt")
#数据清洗 把长度不为15的都去掉
lines1 = lines.filter(lambda line:(len(line.split(","))==15))
lines1.take(10)

['25, Private,226802, 11th,7, Never-married, Machine-op-inspct, Own-child, Black, Male,0,0,40, United-States, <=50K',
 '38, Private,89814, HS-grad,9, Married-civ-spouse, Farming-fishing, Husband, White, Male,0,0,50, United-States, <=50K',
 '28, Local-gov,336951, Assoc-acdm,12, Married-civ-spouse, Protective-serv, Husband, White, Male,0,0,40, United-States, >50K',
 '44, Private,160323, Some-college,10, Married-civ-spouse, Machine-op-inspct, Husband, Black, Male,7688,0,40, United-States, >50K',
 '34, Private,198693, 10th,6, Never-married, Other-service, Not-in-family, White, Male,0,0,30, United-States, <=50K',
 '63, Self-emp-not-inc,104626, Prof-school,15, Married-civ-spouse, Prof-specialty, Husband, White, Male,3103,0,32, United-States, >50K',
 '24, Private,369667, Some-college,10, Never-married, Other-service, Unmarried, White, Female,0,0,40, United-States, <=50K',
 '55, Private,104996, 7th-8th,4, Married-civ-spouse, Craft-repair, Husband, White, Male,0,0,10, United-States, <=50K',
 '6

In [3]:
from pyspark import SparkContext,SparkConf        #导包
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() #创建sparksession对象
#使用编程方式定义RDD
from pyspark.sql.types import *    
from pyspark.sql import Row
#生成表头
schemaString = "age workclass fnlwgt education education-num marital-status occupation relationship race sex capital-gain capital-loss hours-per-week native-country income"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
#生成表的记录
parts = lines1.map(lambda x:x.split(","))
adult = parts.map(lambda p:Row(p[0].strip(),p[1].strip(),p[2].strip(),p[3].strip(),p[4].strip(),p[5].strip(),p[6].strip(),p[7].strip(),p[8].strip(),p[9].strip(),p[10].strip(),p[11].strip(),p[12].strip(),p[13].strip(),p[14].strip()))   #strip() 删除空白符
#将表头和表中记录拼接
df = spark.createDataFrame(adult,schema)
#展示
df.show()

+---+----------------+------+------------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education-num|    marital-status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+------------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 25|         Private|226802|        11th|            7|     Never-married|Machine-op-inspct|    Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|
| 38|         Private| 89814|     HS-grad|            9|Married-civ-spouse|  Farming-fishing|      Husband|White|  Male|           0|           0|            50| United-States| <=50K|
| 28|       Local-gov|336951|  Assoc-acdm|           12|Married-civ-spouse|  Pro

In [4]:
df.distinct()  #去重

DataFrame[age: string, workclass: string, fnlwgt: string, education: string, education-num: string, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: string, capital-loss: string, hours-per-week: string, native-country: string, income: string]

In [5]:
#删除不必要的数据列
df.drop("fnlwgt")

DataFrame[age: string, workclass: string, education: string, education-num: string, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: string, capital-loss: string, hours-per-week: string, native-country: string, income: string]

In [6]:
#选择age列和income列 分析age对income的影响
df_age = df.select(df["age"],df["income"])
df_age.show()

+---+------+
|age|income|
+---+------+
| 25| <=50K|
| 38| <=50K|
| 28|  >50K|
| 44|  >50K|
| 34| <=50K|
| 63|  >50K|
| 24| <=50K|
| 55| <=50K|
| 65|  >50K|
| 36| <=50K|
| 26| <=50K|
| 48|  >50K|
| 43|  >50K|
| 20| <=50K|
| 43| <=50K|
| 37| <=50K|
| 34|  >50K|
| 34| <=50K|
| 25| <=50K|
| 25| <=50K|
+---+------+
only showing top 20 rows



In [7]:
from pyspark.sql import functions
#使用lit方法添加常量数据  withColumn方法新增一列数据

In [8]:
#age 17~30 income>50
df_age_1 = df_age.filter(df_age["age"]>17).filter(df_age["age"]<30).filter(df_age["income"]==">50K").groupBy("income").count()
df_age_1 = df_age_1.withColumn("age_stage",functions.lit("青年"))
df_age_1.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
|  >50K|  721|     青年|
+------+-----+---------+



In [9]:
#age 17~30 income<=50
df_age_2 = df_age.filter(df_age["age"]>17).filter(df_age["age"]<30).filter(df_age["income"]=="<=50K").groupBy("income").count()
df_age_2 = df_age_2.withColumn("age_stage",functions.lit("青年"))
df_age_2.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
| <=50K|11831|     青年|
+------+-----+---------+



In [10]:
#age 30~65 income>50
df_age_3 = df_age.filter(df_age["age"]>=30).filter(df_age["age"]<65).filter(df_age["income"]==">50K").groupBy("income").count()
df_age_3 = df_age_3.withColumn("age_stage",functions.lit("中年"))
df_age_3.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
|  >50K|10141|     中年|
+------+-----+---------+



In [11]:
#age 30~65 income<=50
df_age_4 = df_age.filter(df_age["age"]>=30).filter(df_age["age"]<65).filter(df_age["income"]=="<=50K").groupBy("income").count()
df_age_4 = df_age_4.withColumn("age_stage",functions.lit("中年"))
df_age_4.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
| <=50K|20475|     中年|
+------+-----+---------+



In [12]:
#age 65~100 income>50
df_age_5 = df_age.filter(df_age["age"]>=65).filter(df_age["age"]<100).filter(df_age["income"]==">50K").groupBy("income").count()
df_age_5 = df_age_5.withColumn("age_stage",functions.lit("老年"))
df_age_5.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
|  >50K|  346|     老年|
+------+-----+---------+



In [13]:
#age 65~100 income<=50
df_age_6 = df_age.filter(df_age["age"]>=65).filter(df_age["age"]<100).filter(df_age["income"]=="<=50K").groupBy("income").count()
df_age_6 = df_age_6.withColumn("age_stage",functions.lit("老年"))
df_age_6.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
| <=50K| 1215|     老年|
+------+-----+---------+



In [14]:
#合并多个df
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll,dfs)

df_list=[df_age_1,df_age_2,df_age_3,df_age_4,df_age_5,df_age_6]
df_age_merge = unionAll(*df_list)
df_age_merge.show()

+------+-----+---------+
|income|count|age_stage|
+------+-----+---------+
|  >50K|  721|     青年|
| <=50K|11831|     青年|
|  >50K|10141|     中年|
| <=50K|20475|     中年|
|  >50K|  346|     老年|
| <=50K| 1215|     老年|
+------+-----+---------+



In [19]:
df_age_merge.printSchema()

root
 |-- income: string (nullable = true)
 |-- count: long (nullable = false)
 |-- age_stage: string (nullable = false)



In [22]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_age_merge.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'age','append', prop)

In [None]:
#对性别分析

In [23]:
df_sex = df.select(df["sex"],df["income"])
df_sex.show()

+------+------+
|   sex|income|
+------+------+
|  Male| <=50K|
|  Male| <=50K|
|  Male|  >50K|
|  Male|  >50K|
|  Male| <=50K|
|  Male|  >50K|
|Female| <=50K|
|  Male| <=50K|
|  Male|  >50K|
|  Male| <=50K|
|Female| <=50K|
|  Male|  >50K|
|  Male|  >50K|
|  Male| <=50K|
|Female| <=50K|
|Female| <=50K|
|  Male|  >50K|
|Female| <=50K|
|  Male| <=50K|
|  Male| <=50K|
+------+------+
only showing top 20 rows



In [24]:
# >50K 
df_sex_1 = df_sex.filter(df_sex["income"]==">50K").groupBy("sex").count()
df_sex_1 = df_sex_1.withColumn("income",functions.lit(">50K"))
df_sex_1.show()

+------+-----+------+
|   sex|count|income|
+------+-----+------+
|Female| 1669|  >50K|
|  Male| 9539|  >50K|
+------+-----+------+



In [25]:
# <=50K 
df_sex_2 = df_sex.filter(df_sex["income"]=="<=50K").groupBy("sex").count()
df_sex_2 = df_sex_2.withColumn("income",functions.lit("<=50K"))
df_sex_2.show()

+------+-----+------+
|   sex|count|income|
+------+-----+------+
|Female|13026| <=50K|
|  Male|20988| <=50K|
+------+-----+------+



In [26]:
#合并多个df
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll,dfs)

df_list=[df_sex_1,df_sex_2]
df_sex_merge = unionAll(*df_list)
df_sex_merge.show()

+------+-----+------+
|   sex|count|income|
+------+-----+------+
|Female| 1669|  >50K|
|  Male| 9539|  >50K|
|Female|13026| <=50K|
|  Male|20988| <=50K|
+------+-----+------+



In [54]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_sex_1.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'sex_1','append', prop)

In [55]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_sex_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'sex_2','append', prop)

In [28]:
#对种族分析
df_race = df.select(df["race"],df["income"])


In [29]:
#  >50K 
df_race_1 = df_race.filter(df_race["income"]==">50K").groupBy("race").count()
df_race_1 = df_race_1.withColumn("income",functions.lit(">50K"))
df_race_1.show()

+------------------+-----+------+
|              race|count|income|
+------------------+-----+------+
|             Other|   45|  >50K|
|Amer-Indian-Eskimo|   53|  >50K|
|             White|10207|  >50K|
|Asian-Pac-Islander|  369|  >50K|
|             Black|  534|  >50K|
+------------------+-----+------+



In [56]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_race_1.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'race_1','append', prop)

In [30]:
#  <=50K 
df_race_2 = df_race.filter(df_race["income"]=="<=50K").groupBy("race").count()
df_race_2 = df_race_2.withColumn("income",functions.lit("<=50K"))
df_race_2.show()

+------------------+-----+------+
|              race|count|income|
+------------------+-----+------+
|             Other|  308| <=50K|
|Amer-Indian-Eskimo|  382| <=50K|
|             White|28696| <=50K|
|Asian-Pac-Islander|  934| <=50K|
|             Black| 3694| <=50K|
+------------------+-----+------+



In [57]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_race_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'race_2','append', prop)

In [32]:
#对学历分析
df_education = df.select(df["education"],df["income"])
#  >50K 
df_education_1 = df_education.filter(df_education["income"]==">50K").groupBy("education").count()
df_education_1 = df_education_1.withColumn("income",functions.lit(">50K"))
#  <=50K 
df_education_2 = df_education.filter(df_education["income"]=="<=50K").groupBy("education").count()
df_education_2 = df_education_2.withColumn("income",functions.lit("<=50K"))

+------------+-----+------+
|   education|count|income|
+------------+-----+------+
|     Masters| 1393|  >50K|
|        10th|   82|  >50K|
|     5th-6th|   22|  >50K|
|  Assoc-acdm|  398|  >50K|
|   Assoc-voc|  504|  >50K|
|     7th-8th|   55|  >50K|
|         9th|   38|  >50K|
|     HS-grad| 2416|  >50K|
|   Bachelors| 3178|  >50K|
|        11th|   89|  >50K|
|     1st-4th|    8|  >50K|
|   Preschool|    1|  >50K|
|        12th|   43|  >50K|
|   Doctorate|  399|  >50K|
|Some-college| 1990|  >50K|
| Prof-school|  592|  >50K|
|        10th| 1141| <=50K|
|     Masters| 1121| <=50K|
|     5th-6th|  427| <=50K|
|  Assoc-acdm| 1109| <=50K|
+------------+-----+------+
only showing top 20 rows



In [58]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_education_1.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'education_1','append', prop)

In [59]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_education_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'education_2','append', prop)

In [34]:
#对职业分析
df_occupation = df.select(df["occupation"],df["income"])
#  >50K 
df_occupation_1 = df_occupation.filter(df_occupation["income"]==">50K").groupBy("occupation").count()
df_occupation_1 = df_occupation_1.withColumn("income",functions.lit(">50K"))
#  <=50K 
df_occupation_2 = df_occupation.filter(df_occupation["income"]=="<=50K").groupBy("occupation").count()
df_occupation_2 = df_occupation_2.withColumn("income",functions.lit("<=50K"))


+-----------------+-----+------+
|       occupation|count|income|
+-----------------+-----+------+
|            Sales| 1455|  >50K|
|  Exec-managerial| 2867|  >50K|
|   Prof-specialty| 2704|  >50K|
|Handlers-cleaners|  135|  >50K|
|  Farming-fishing|  172|  >50K|
|     Craft-repair| 1355|  >50K|
| Transport-moving|  478|  >50K|
|  Priv-house-serv|    3|  >50K|
|  Protective-serv|  307|  >50K|
|    Other-service|  196|  >50K|
|     Tech-support|  411|  >50K|
|Machine-op-inspct|  365|  >50K|
|     Armed-Forces|    4|  >50K|
|     Adm-clerical|  756|  >50K|
|            Sales| 3953| <=50K|
|  Exec-managerial| 3117| <=50K|
|   Prof-specialty| 3304| <=50K|
|Handlers-cleaners| 1911| <=50K|
|  Farming-fishing| 1308| <=50K|
|     Craft-repair| 4665| <=50K|
+-----------------+-----+------+
only showing top 20 rows



In [60]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_occupation_1.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'occupation_1','append', prop)

In [61]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_occupation_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'occupation_2','append', prop)

In [62]:
#对每周工作时间分析
df_hours = df.select(df["hours-per-week"],df["income"])
from pyspark.sql import functions
# 0~35 income>50
df_hours_1 = df_hours.filter(df_hours["hours-per-week"]>0).filter(df_hours["hours-per-week"]<35).filter(df_age["income"]==">50K").groupBy("income").count().withColumn("hours_stage",functions.lit("shortTime"))
# 35~65 income>50
df_hours_2 = df_hours.filter(df_hours["hours-per-week"]>=35).filter(df_hours["hours-per-week"]<65).filter(df_age["income"]==">50K").groupBy("income").count().withColumn("hours_stage",functions.lit("middleTime"))
# 65~100 income>50
df_hours_3 = df_hours.filter(df_hours["hours-per-week"]>=65).filter(df_hours["hours-per-week"]<100).filter(df_age["income"]==">50K").groupBy("income").count().withColumn("hours_stage",functions.lit("loneTime"))
#合并多个df
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll,dfs)

df_list=[df_hours_1,df_hours_2,df_hours_3]
df_hours_merge = unionAll(*df_list)
df_hours_merge.show()

+------+-----+-----------+
|income|count|hours_stage|
+------+-----+-----------+
|  >50K|  481|  shortTime|
|  >50K|10164| middleTime|
|  >50K|  563|   loneTime|
+------+-----+-----------+



In [64]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_hours_merge.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'hours_1','append', prop)

In [63]:
#对每周工作时间分析
df_hours = df.select(df["hours-per-week"],df["income"])
from pyspark.sql import functions
# 0~35 income<=50
df_hours_4 = df_hours.filter(df_hours["hours-per-week"]>0).filter(df_hours["hours-per-week"]<35).filter(df_age["income"]=="<=50K").groupBy("income").count().withColumn("hours_stage",functions.lit("shortTime"))
# 35~65 income<=50
df_hours_5 = df_hours.filter(df_hours["hours-per-week"]>=35).filter(df_hours["hours-per-week"]<65).filter(df_age["income"]=="<=50K").groupBy("income").count().withColumn("hours_stage",functions.lit("middleTime"))
# 65~100 income<=50
df_hours_6 = df_hours.filter(df_hours["hours-per-week"]>=65).filter(df_hours["hours-per-week"]<100).filter(df_age["income"]=="<=50K").groupBy("income").count().withColumn("hours_stage",functions.lit("loneTime"))
#合并多个df
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll,dfs)

df_list=[df_hours_4,df_hours_5,df_hours_6]
df_hours_merge_2 = unionAll(*df_list)
df_hours_merge_2.show()

+------+-----+-----------+
|income|count|hours_stage|
+------+-----+-----------+
| <=50K| 6543|  shortTime|
| <=50K|26507| middleTime|
| <=50K|  964|   loneTime|
+------+-----+-----------+



In [65]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_hours_merge_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'hours_2','append', prop)

In [67]:

df.describe()

DataFrame[summary: string, age: string, workclass: string, fnlwgt: string, education: string, education-num: string, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: string, capital-loss: string, hours-per-week: string, native-country: string, income: string]

In [68]:
df_country = df.select(df["native-country"],df["income"])
#  >50K 
df_country_1 = df_country.filter(df_country["income"]==">50K").groupBy("native-country").count()
df_country_1 = df_country_1.withColumn("income",functions.lit(">50K"))
df_country_1.show()

+--------------+-----+------+
|native-country|count|income|
+--------------+-----+------+
|   Philippines|   84|  >50K|
|       Germany|   58|  >50K|
|      Cambodia|    9|  >50K|
|        France|   16|  >50K|
|        Greece|   18|  >50K|
|        Taiwan|   25|  >50K|
|       Ecuador|    6|  >50K|
|     Nicaragua|    3|  >50K|
|          Hong|    8|  >50K|
|          Peru|    4|  >50K|
|         China|   36|  >50K|
|         India|   62|  >50K|
|         Italy|   33|  >50K|
|          Cuba|   34|  >50K|
|         South|   18|  >50K|
|          Iran|   22|  >50K|
|       Ireland|   10|  >50K|
|      Thailand|    5|  >50K|
|          Laos|    2|  >50K|
|   El-Salvador|   11|  >50K|
+--------------+-----+------+
only showing top 20 rows



In [69]:
#  >50K 
df_country_2 = df_country.filter(df_country["income"]=="<=50K").groupBy("native-country").count()
df_country_2 = df_country_2.withColumn("income",functions.lit("<=50K"))
df_country_2.show()

+------------------+-----+------+
|    native-country|count|income|
+------------------+-----+------+
|       Philippines|  199| <=50K|
|           Germany|  135| <=50K|
|          Cambodia|   17| <=50K|
|            France|   20| <=50K|
|            Greece|   31| <=50K|
|            Taiwan|   30| <=50K|
|           Ecuador|   37| <=50K|
|         Nicaragua|   45| <=50K|
|              Hong|   20| <=50K|
|              Peru|   41| <=50K|
|             India|   85| <=50K|
|             China|   77| <=50K|
|             Italy|   67| <=50K|
|Holand-Netherlands|    1| <=50K|
|              Cuba|   99| <=50K|
|             South|   83| <=50K|
|              Iran|   34| <=50K|
|           Ireland|   26| <=50K|
|          Thailand|   24| <=50K|
|              Laos|   19| <=50K|
+------------------+-----+------+
only showing top 20 rows



In [70]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_country_1.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'country_1','append', prop)

In [71]:
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = '1'
prop['driver'] = "com.mysql.jdbc.Driver"
df_country_2.write.jdbc("jdbc:mysql://192.168.43.190:3306/spark?serverTimezone=UTC&&useSSL=false",'country_2','append', prop)