In [1]:
# encoding=UTF-8
# !flask/bin/python

from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement
import pandas as pd
from pyspark.sql.types import StringType, StructType, StructField


class CassandraType(object):
    PRODUCTION = 0
    TEST = 1
    TEST_DOCKER = 2


class CassandraDAO(object):

    # you have to install following items :
    # a. python-Cassandra driver
    # b. pyspark cassandra connector

    def __init__(self, type):
        #         print('runing father.__init__')
        if type == CassandraType.PRODUCTION:
            self.contact_points = ['192.168.95.127', '192.168.95.122']
            self.contact_points_str = "192.168.95.127,192.168.95.122"
        elif type == CassandraType.TEST:
            self.contact_points = ['192.168.0.41', '192.168.0.42']
            self.contact_points_str = "192.168.0.41,192.168.0.42"
        else:
            self.contact_points = ['192.168.0.121', '192.168.0.122', '192.168.0.52']
            self.contact_points_str = "192.168.0.121,192.168.0.122,192.168.0.52"

        self.formatString = "org.apache.spark.sql.cassandra"
        self.username = "username"
        self.password = "password"
        self.cluster = None
        self.session = None
        self.createSession()

    def __del__(self):
        self.cluster.shutdown()

    def pandas_factory(self, colnames, rows):
        return pd.DataFrame(rows, columns=colnames)

    def createSession(self):
        print "contact_points = " + self.contact_points_str
        self.cluster = Cluster(
            contact_points=self.contact_points,  # random select a node
            #             load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1'),
            #         auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
        )
        self.session = self.cluster.connect()
        self.session.row_factory = self.pandas_factory
        self.session.default_fetch_size = 10000000  # needed for large queries, otherwise driver will do pagination. Default is 50000.

    def getSession(self):
        return self.session

    def execCQL(self, keyspace, cql):
        """
        execute CQL
        """
        self.session.set_keyspace(keyspace)
        self.session.execute_async(cql)

    def execCQLSelect(self, keyspace, cql):
        """
        execute CQL, select only
        """

        self.session.set_keyspace(keyspace)

        #       cassandra ResultSet
        async_results = self.session.execute_async(cql)
        return async_results

    def execCQLCallBackAnysc(self, keyspace, cql, handle_success, handle_error):
        """
        execute CQL, if success => handle_success function, else handle_error
        """
        self.session.set_keyspace(keyspace)
        async_results = self.session.execute_async(cql)
        async_results.add_callbacks(handle_success, handle_error)

    def execCQLSelectToPandasDF(self, keyspace, cql):
        """
        execute CQL, select only, return Pandas DataFrame
        """

        self.session.set_keyspace(keyspace)

        #       cassandra ResultSet
        async_results = self.session.execute_async(cql)
        #         async_results = self.session.execute_async(cql)
        #       to Pandas DataFrame
        return async_results.result()._current_rows


    def execCQLSelectToRDD(self, sqlContext, keyspace, cql):
        """
        execute CQL, select only, return Spark RDD
        """

        return self.execCQLSelectToDF(sqlContext, keyspace, cql).rdd.map(tuple)  # dataFrame to RDD

    @property
    def contactPoints(self):
        return self.contact_points

    @contactPoints.setter
    def contactPoints(self, contact_points):
        self.contact_points = contact_points

    @contactPoints.deleter
    def contactPoints(self):
        del self.contact_points

    # pyspark cassandra connector
    def readFromCassandraDF(self, sqlContext, keyspace, table):
        """
        read data from Cassandra, return Dataframe
        """

        return sqlContext.read \
            .format(self.formatString) \
            .options(table=table, keyspace=keyspace) \
            .option("spark.cassandra.connection.host", self.contact_points_str) \
            .load()

    def readFromCassandraRDD(self, sqlContext, keyspace, table):
        """
        read data from Cassandra, return RDD
        """

        df = sqlContext.read \
            .format(self.formatString) \
            .options(table=table, keyspace=keyspace) \
            .option("spark.cassandra.connection.host", self.contact_points_str) \
            .load()
        return df.rdd.map(tuple)  # dataFrame to RDD

    def saveToCassandraDF(self, dataFrame, keyspace, table, mode="error"):
        """
        Save data to Cassandra using DataFrame, select one mode to save
        
        SaveMode.ErrorIfExists (default) | "error"      When saving a DataFrame to a data source,
                                                        if data already exists, an exception is expected to be thrown.
        SaveMode.Append                  | "append"     When saving a DataFrame to a data source,
                                                        if data/table already exists, contents of the DataFrame are
                                                        expected to be appended to existing data.
        SaveMode.Overwrite               | "overwrite"  Overwrite mode means that when saving a DataFrame to a data source,
                                                        if data/table already exists, existing data is expected to be
                                                        overwritten by the contents of the DataFrame.
        SaveMode.Ignore                  | "ignore"     Ignore mode means that when saving a DataFrame to a data source,
                                                        if data already exists, the save operation is expected to not
                                                        save the contents of the DataFrame and to not change the
                                                        existing data. This is similar to a CREATE TABLE IF NOT EXISTS

                                                        in SQL.
        """

        dataFrame.write \
            .format(self.formatString) \
            .mode(mode) \
            .options(table=table, keyspace=keyspace) \
            .option("spark.cassandra.connection.host", self.contact_points_str) \
            .save()

In [2]:
import regex as re
import pandas as pd
from pyspark.sql.functions import udf, lit, lower

In [3]:
NLP_KEYSPACE = 'nlp_keyspace'
DCARD_ARTICLE_TABLE = 'dcard_article'
DCARD_RESPONSE_TABLE = 'dcard_response'
DCARD_ARTICLE_TEST_TABLE = 'dcard_article_test'
WORD_ARTICLE_MAPPING = 'word_article_mapping'
DCARD_QUERY_TABLE = 'dcard_query_table'
dao = CassandraDAO('BACKUP')

contact_points = 192.168.0.121,192.168.0.122,192.168.0.52


In [4]:
article_id = 227982483
article_id_start = 0
article_id_end = 5000

# question cleaning

In [4]:
a_df = dao.readFromCassandraDF(sqlContext, NLP_KEYSPACE, DCARD_ARTICLE_TABLE)
# a_df.limit(5).show()

In [5]:
a_df = a_df.withColumnRenamed('create_date','question_create_date')
# a_df.limit(5).show()

+----------+-----+--------+--------------------+--------------------+--------------------+-------------+----------+
|article_id|board|category|             content|          crawl_date|question_create_date|question_type|     title|
+----------+-----+--------+--------------------+--------------------+--------------------+-------------+----------+
| 224297810|  job|    null|想請問在健身房的工作內容有什麼呢需...|2019-02-15 15:49:...|2016-07-03T16:51:...|         null|健身房工作經驗（問）|
| 225961291| talk|    null|首次發文，手機排版，請見諒（跟朋友...|2019-02-19 16:25:...|2017-03-11T09:36:...|         null|  #圖 刺青的意義|
| 225196435| food|    null|                    |2019-02-13 09:39:...|2016-11-15T03:16:...|         null|      蜂蜜真假|
| 226405342| mood|    null|星巴克，旁邊一個高中生點了咖啡第二...|2019-02-27 13:32:...|2017-05-16T07:12:...|         null|  星巴克第二杯半價|
|    594410| mood|    null|假身分 part1第一次發文 
手...|2019-02-23 15:28:...|2015-09-26T16:31:...|         null| 假身分 part5|
+----------+-----+--------+--------------------+--------------------+---

In [6]:
def remove_question_tag(question):
        
    replace_str = [u'問 ',u'圖 ',u'更 ',u'轉 ',u'更新 ',u'文長 ',
                   u'閒聊 ',u'黑特 ',u'分享 ',u'慎入 ',u'圖多 ',
                   u'有雷 ',u'心得 ',u'開箱 ',u'轉載 ',u'情報 ']
    for s in replace_str:
        question = question.replace(s, u'')
    question = re.sub(u"\\(.*?\\)|\\{.*?}|\\[.*?]|\\［.*?］|\\（.*?）|\\【.*?】|\\#.*? |\\#.*?#", u"", question)
    question = re.sub(u'(Re:|re:)', u'', question)
    return question

remove_question_tag_udf = udf(remove_question_tag)
a_df = a_df.select('article_id', 'board', 'question_create_date',
                    remove_question_tag_udf(a_df['title']).alias('question'),
                    )

In [7]:
# a_df.limit(5).show()

In [20]:
a_df.count()

384162

# response cleaning

In [6]:
r_df = dao.readFromCassandraDF(sqlContext, NLP_KEYSPACE, DCARD_RESPONSE_TABLE)
# r_df = r_df.withColumn("response", r_df['content'])
r_df = r_df.withColumnRenamed('content','response')
r_df = r_df.withColumnRenamed('create_date','response_create_date')
r_df.limit(5).toPandas()

Unnamed: 0,article_id,floor,response,crawl_date,response_create_date,like_count
0,224297810,1,"世界健身房 \n櫃檯,過卡,發毛巾,發車票,有客人進來需喊歡迎光臨 \n教練,業績壓力比較...",2019-02-15 15:48:20.705,2016-07-03T16:59:42.671Z,3
1,225961291,1,刺青不是壞事，是故事。\n刺青是自己的事，關他們屁事。,2019-02-19 16:23:35.220,2017-03-11T09:41:49.898Z,9
2,225961291,2,我聞到了靈異刺青,2019-02-19 16:22:31.888,2017-03-11T11:15:52.118Z,5
3,225961291,4,我一直也很想刺有意義的圖，但現在的人對刺青的接受度還是不夠....（尤其是老一輩）\n怕男友...,2019-02-19 16:25:10.227,2017-03-11T14:34:32.081Z,0
4,225961291,7,原po你好 聽完你的故事我很感動\n方不方便請問你是在哪裡刺的呢？,2019-02-19 16:23:35.228,2017-03-11T21:21:08.096Z,0


In [57]:
# print r_df.count()

In [58]:
# r_df.select('article_id').distinct().count()

In [59]:
def has_url(response):
        
    rule = 'http://\S+|https://\S+'
    pattern = re.compile(rule)
    match = pattern.search(response)
    if match:  
       return True 

#     rule = '(.png|.jpg|.jpeg|.gif|.svg|.txt)'
#     pattern = re.compile(rule)
#     match = pattern.search(response)
#     if match:  
#        return True 
    
    return False

has_url_udf = udf(has_url)
r_df = r_df.select('article_id', 'floor', 'response', 'like_count', 'response_create_date',
                    has_url_udf(r_df['response']).alias('has_url')
                    )

# r_df.limit(5).show()

In [60]:
def has_reply_tag(response):
        
    rule = '(B[\d+]|b[\d+]|[\d+]F|[\d+]f|[\d+]樓|樓上|樓下|樓主|原po|原PO|原Po|原波|+1)'
    pattern = re.compile(rule)
    match = pattern.search(response)
    if match:  
        return True 

    return False

has_reply_tag_udf = udf(has_reply_tag)
r_df = r_df.select('article_id', 'floor', 'response', 'like_count','response_create_date',
                   'has_url',
                    has_reply_tag_udf(r_df['response']).alias('has_reply_tag')
                    )

In [61]:
def response_too_long(response):
        
    if len(response) > 40:  
        return True 

    return False

response_too_long_udf = udf(response_too_long)
r_df = r_df.select('article_id', 'floor', 'response', 'like_count','response_create_date',
                   'has_url', 'has_reply_tag',
                    response_too_long_udf(r_df['response']).alias('response_too_long')
                    )

In [62]:
# r_df.limit(1000).filter(r_df['has_floor_tag'] == True).show()

In [63]:
# 去除has_url的row or 去除has_floor的row
r_df = r_df.filter((r_df['has_url'] == False) &
                   (r_df['has_reply_tag'] == False) &
                   (r_df['response_too_long'] == False))
r_df = r_df.select('article_id', 'floor', 'response', 'like_count','response_create_date')
print r_df.count()
# r_df.limit(5).show()

3055594


In [None]:
# # test no response
# import numpy as np
# columns = ['article_id', 'floor', 'response', 'like_count']
# empty_df = pd.DataFrame({}, columns =columns)
# empty_df

# create query table

In [65]:
ans_df = a_df.join(r_df, ['article_id'], "inner")
# ans_df.limit(100).toPandas()
print ans_df.count()

3007093


In [None]:
# ans_df = ans_df.na.drop()
# print ans_df.count()

In [66]:
# like_count 分群
top = 15#int(len(ans_df)*0.25)
ans_df.limit(top).toPandas()

Unnamed: 0,article_id,board,question,floor,response,like_count
0,10156,talk,校園聊天加好友呢？,1,這樣感覺女生留言會被加到爆ㄎㄎ,0
1,10156,talk,校園聊天加好友呢？,2,對女生造成的負擔應該遠大於系統..,0
2,10156,talk,校園聊天加好友呢？,3,這樣Dcard的特色就不見了阿~~,0
3,10156,talk,校園聊天加好友呢？,4,對方不會知道誰邀請她啦XD\n也對，這樣好像就少了每天的等待有點可惜\n,0
4,10156,talk,校園聊天加好友呢？,5,感覺不錯 希望能夠擁有這功能,0
5,10156,talk,校園聊天加好友呢？,6,增加發卡機率呢？XD,0
6,10422,talk,唬人幫加油!!,1,抽到我記得加我嘿!!,0
7,10422,talk,唬人幫加油!!,2,XD 凌晨三點半開dcard 同學真不簡單xd\n,0
8,10422,talk,唬人幫加油!!,3,我都沒抽過同校的!!,0
9,10422,talk,唬人幫加油!!,4,我已經遇過超過10個同校的了XD,0


In [67]:
dao.saveToCassandraDF(ans_df, NLP_KEYSPACE, DCARD_QUERY_TABLE, "append")