In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark mobike session").master("spark://spark-master:7077").config("spark.executor.memory","512m").getOrCreate()

In [10]:
mobike_df = spark.read.csv("mobike.csv", sep = ",", header = True)
weibo_df = spark.read.csv("weibo.csv", sep = ";", header = True)

In [11]:
weibo_df1 = weibo_df.select(['location_name', 'location_latitude', 'location_longitude'])
mobike_df1 = mobike_df.select(['bikeid', 'start_time', 'end_time','start_location_x', 'start_location_y', 'end_location_x', 'end_location_y'])

In [4]:
mobike_df1.count()

1023603

In [12]:
#weibo_df1.count()
weibo_df1.describe()

DataFrame[summary: string, location_name: string, location_latitude: string, location_longitude: string]

In [6]:
mobike_df1.show()

+------+----------------+----------------+----------------+----------------+--------------+--------------+
|bikeid|      start_time|        end_time|start_location_x|start_location_y|end_location_x|end_location_y|
+------+----------------+----------------+----------------+----------------+--------------+--------------+
|324731|2016-08-28 00:00|2016-08-28 00:08|          121.49|           31.28|       121.486|        31.273|
|288841|2016-08-28 00:00|2016-08-28 00:17|         121.462|          31.313|       121.455|        31.315|
|315873|2016-08-28 00:00|2016-08-28 00:14|         121.416|          31.154|       121.419|        31.156|
| 93155|2016-08-28 00:00|2016-08-28 00:12|         121.436|           31.32|       121.444|         31.31|
|352484|2016-08-28 00:00|2016-08-28 00:08|         121.459|          31.325|        121.46|        31.315|
|127840|2016-08-28 00:00|2016-08-28 00:07|         121.353|          31.285|       121.362|        31.283|
|125990|2016-08-28 00:00|2016-08-28 0

In [12]:
mobike_df1 = mobike_df1.withColumnRenamed('start_location_x', 'start_longitude')
mobike_df1 =mobike_df1.withColumnRenamed('start_location_y', 'start_latitude')
mobike_df1 =mobike_df1.withColumnRenamed('end_location_x', 'end_longitude')
mobike_df1 =mobike_df1.withColumnRenamed('end_location_y', 'end_latitude')

In [8]:
mobike_df1.show()

+------+----------------+----------------+---------------+--------------+-------------+------------+
|bikeid|      start_time|        end_time|start_longitude|start_latitude|end_longitude|end_latitude|
+------+----------------+----------------+---------------+--------------+-------------+------------+
|324731|2016-08-28 00:00|2016-08-28 00:08|         121.49|         31.28|      121.486|      31.273|
|288841|2016-08-28 00:00|2016-08-28 00:17|        121.462|        31.313|      121.455|      31.315|
|315873|2016-08-28 00:00|2016-08-28 00:14|        121.416|        31.154|      121.419|      31.156|
| 93155|2016-08-28 00:00|2016-08-28 00:12|        121.436|         31.32|      121.444|       31.31|
|352484|2016-08-28 00:00|2016-08-28 00:08|        121.459|        31.325|       121.46|      31.315|
|127840|2016-08-28 00:00|2016-08-28 00:07|        121.353|        31.285|      121.362|      31.283|
|125990|2016-08-28 00:00|2016-08-28 00:30|        121.441|        31.228|      121.455|    

In [13]:
from datetime import datetime

mobike_start_rdd = mobike_df1.rdd.map(lambda x: [x.bikeid, datetime.fromisoformat(x.start_time).isoweekday(), datetime.fromisoformat(x.start_time).hour, datetime.fromisoformat(x.start_time).minute, x.start_longitude,x.start_latitude])
mobike_end_rdd = mobike_df1.rdd.map(lambda x: [x.bikeid, datetime.fromisoformat(x.start_time).isoweekday(), datetime.fromisoformat(x.end_time).hour, datetime.fromisoformat(x.end_time).minute, x.end_longitude,x.end_latitude])

In [14]:
start_df = spark.createDataFrame(mobike_start_rdd, schema=['bikeid', 'weekday', 'hour', 'minute', 'location_longitude', 'location_latitude'])
end_df = spark.createDataFrame(mobike_end_rdd, schema=['bikeid', 'weekday', 'hour', 'minute', 'location_longitude', 'location_latitude'])

In [7]:
start_df.count()

1023603

In [20]:
end_df.show()

+------+-------+----+------+------------------+-----------------+
|bikeid|weekday|hour|minute|location_longitude|location_latitude|
+------+-------+----+------+------------------+-----------------+
|324731|      7|   0|     8|           121.486|           31.273|
|288841|      7|   0|    17|           121.455|           31.315|
|315873|      7|   0|    14|           121.419|           31.156|
| 93155|      7|   0|    12|           121.444|            31.31|
|352484|      7|   0|     8|            121.46|           31.315|
|127840|      7|   0|     7|           121.362|           31.283|
|125990|      7|   0|    30|           121.455|            31.19|
|346549|      7|   0|    31|           121.423|           31.268|
|352552|      7|   0|    14|           121.518|           31.291|
|351480|      7|   0|    10|           121.453|           31.311|
|130705|      7|   0|    16|           121.405|           31.312|
|348492|      7|   1|     4|           121.369|            31.18|
|105800|  

In [15]:
from pyspark.sql.functions import col
weibo_df1 = weibo_df1.filter(col('location_longitude').cast("float").isNotNull())
weibo_df1 = weibo_df1.filter(col('location_latitude').cast("float").isNotNull())
weibo_rdd = weibo_df1.rdd.map(lambda x: [x.location_name, round(float(x.location_longitude), 3), round(float(x.location_latitude), 3)])
weibo_df2 = spark.createDataFrame(weibo_rdd, schema=['location_name', 'location_longitude', 'location_latitude'])

In [9]:
weibo_df2.count()

502574

In [16]:
weibo_df2 = weibo_df2.drop_duplicates(['location_name'])

In [17]:
start_df1 = start_df.join(weibo_df2, on = ['location_longitude', 'location_latitude'])

In [18]:
end_df1 = end_df.join(weibo_df2, on = ['location_longitude', 'location_latitude'])

In [13]:
start_df1.count()

1265911

In [14]:
end_df1.count()

1246067

In [16]:
start_df1.groupby(['location_name', 'location_longitude', 'location_latitude', 'weekday', 'hour', 'minute']).agg({'bikeid': 'count'}).show()

+------------------------+------------------+-----------------+-------+----+------+-------------+
|           location_name|location_longitude|location_latitude|weekday|hour|minute|count(bikeid)|
+------------------------+------------------+-----------------+-------+----+------+-------------+
|                世纪联华|           121.373|           31.217|      3|  15|    18|            1|
|        上海西华酒店公寓|           121.431|           31.204|      3|  23|    34|            1|
|上海市心理咨询与治疗中心|           121.444|            31.19|      1|   9|    53|            1|
|                爱邦大厦|           121.444|            31.19|      5|  18|    53|            1|
|            明园森林都市|            121.45|            31.29|      1|   9|     3|            1|
|       避风塘 延长中路店|           121.452|           31.274|      2|  20|    24|            1|
|                  安乐坊|           121.453|           31.229|      6|  10|    28|            1|
|             ROUGE SALON|            121.46|           31.221|  

In [19]:
start_df2 = start_df1.groupby(['location_name', 'location_longitude', 'location_latitude', 'weekday', 'hour']).agg({'bikeid': 'count'})
end_df2 = end_df1.groupby(['location_name', 'location_longitude', 'location_latitude', 'weekday', 'hour']).agg({'bikeid': 'count'})

In [20]:
start_df2 = start_df2.withColumnRenamed("count(bikeid)", 'bike_popularity')
end_df2 = end_df2.withColumnRenamed("count(bikeid)", 'bike_popularity')

In [19]:
start_df2.show()

+--------------------------+------------------+-----------------+-------+----+------+---------------+
|             location_name|location_longitude|location_latitude|weekday|hour|minute|bike_popularity|
+--------------------------+------------------+-----------------+-------+----+------+---------------+
|          峥峥饮食牛肉拉面|           121.421|           31.224|      3|   7|     9|              1|
|                汇都精作坊|           121.421|           31.224|      2|  19|    29|              1|
|上海银星皇冠假日酒店碧玉厅|           121.426|           31.206|      3|  18|    15|              1|
|                    新會苑|           121.434|            31.24|      6|  15|    20|              1|
|              中环现代大厦|           121.434|            31.24|      6|  21|    18|              1|
|             The Apartment|           121.441|           31.213|      2|  12|    59|              1|
|       JZ club(爵士俱乐部)|           121.441|           31.213|      2|  11|    21|              1|
|     Rhumerie Bount

In [21]:
start_df2 = start_df2.orderBy(["bike_popularity"], ascending = [0])

In [22]:
end_df2 = end_df2.orderBy(["bike_popularity"], ascending = [0])

In [2]:
import pymysql
con = pymysql.connect(host = 'mysql', port = 3306, user='root', password='root', charset = 'utf8')

In [5]:
sql = "CREATE DATABASE IF NOT EXISTS mobike"

cursor = con.cursor()

cursor.execute(sql)

1

In [6]:
sql = "USE mobike"
cursor.execute(sql)

0

In [7]:
sql = '''CREATE TABLE `start` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `location_name` CHAR(50) ,
  `location_longitude` FLOAT NOT NULL,
  `location_latitude` FLOAT NOT NULL,
  `weekday` INT NOT NULL,
  `hour` INT NOT NULL,
  `bike_popularity` INT NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
'''
cursor.execute(sql)

0

In [8]:
sql = '''CREATE TABLE `end` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `location_name` CHAR(50) ,
  `location_longitude` FLOAT NOT NULL,
  `location_latitude` FLOAT NOT NULL,
  `weekday` INT NOT NULL,
  `hour` INT NOT NULL,
  `bike_popularity` INT NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
'''
cursor.execute(sql)

0

In [23]:
dataCollect = start_df2.rdd.toLocalIterator()

In [27]:
for x in dataCollect:
    sql = "INSERT INTO start(location_name, location_longitude, location_latitude, weekday, hour, bike_popularity) \
           VALUES ('%s', '%s',  %s,  '%s',  %s, %s)" % \
           (x.location_name, x.location_longitude, x.location_latitude, x.weekday, x.hour, x.bike_popularity)
    try:
       # 执行sql语句
       cursor.execute(sql)
       # 执行sql语句
       con.commit()
    except:
       # 发生错误时回滚
       con.rollback()

In [30]:
enddataCollect = end_df2.rdd.toLocalIterator()
for x in enddataCollect:
    sql = "INSERT INTO end(location_name, location_longitude, location_latitude, weekday, hour, bike_popularity) \
           VALUES ('%s', '%s',  %s,  '%s',  %s, %s)" % \
           (x.location_name, x.location_longitude, x.location_latitude, x.weekday, x.hour, x.bike_popularity)
    try:
       # 执行sql语句
       cursor.execute(sql)
       # 执行sql语句
       con.commit()
    except:
       # 发生错误时回滚
       con.rollback()