In [1]:
#引入spark
import findspark
findspark.init()
import pyspark

#sparkSQL
from pyspark.sql import SQLContext
from pyspark.sql.types import *

#结巴分词
import jieba
import jieba.analyse
jieba.enable_parallel(4)
jieba.load_userdict('/notebooks/ciku/words-360w.res.txt')
jieba.analyse.set_stop_words('/notebooks/ciku/chinese_stopword.txt')

#BeautifulSoup,剔除HTML标签
from BeautifulSoup import BeautifulSoup

#图表生成
import matplotlib.pyplot as plt
%matplotlib inline
#词云
from wordcloud import WordCloud

#清洗规则
import re 
facePatt = re.compile(r'\[.*?\]') 
namePatt = re.compile(r'^@.*') 
punct = set(u'''|▲/#:!),.:;?]}¢'"、。〉》」』】〕〗〞︰︱︳﹐､﹒
﹔﹕﹖﹗﹚﹜﹞！），．：；？｜｝︴︶︸︺︼︾﹀﹂﹄﹏､～￠
々‖•·ˇˉ―--′’”([{£¥'"‵〈《「『【〔〖（［｛￡￥〝︵︷︹︻
︽︿﹁﹃﹙﹛﹝（｛“‘-—_…''')
# 对str/unicode
filterpunt = lambda s: ''.join(filter(lambda x: x not in punct, s))
#分词，清洗
def sp(row):
    soup = BeautifulSoup(row.weibo_content).findAll(text=True)
    res = list()
    for ct in soup:
        ct = namePatt.sub('',facePatt.sub('', filterpunt(ct)))
        if ct is not None and ct != '':
            res += jieba.cut(ct)
    return res


#初始化sparkContext
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

#生成停止词RDD
stopWordRDD = sc.textFile("file:///notebooks/ciku/chinese_stopword.txt")
stopWordRDD = sc.parallelize([(word,1) for word in stopWordRDD.collect()])

#生成词典RDD
dictSchema = StructType([ \
    StructField("word",StringType(),True), \
    StructField("freq",StringType(),True), \
    StructField("prop",StringType(),True)
])
dictDF = sqlContext.load(source="com.databricks.spark.csv", path = "/notebooks/ciku/words-360w.res.txt",schema = dictSchema,delimiter=" ")
dictRDD = dictDF.select('word','prop').rdd.map(tuple)

if sqlContext:
    print "Spark 初始化完成"

Building prefix dict from the default dictionary ...
Loading model from cache /tmp/jieba.cache
Loading model cost 0.525 seconds.
Prefix dict has been built succesfully.


Spark 初始化完成


In [2]:
#定义数据结构
customSchema = StructType([ \
    StructField("Id",LongType(),True), \
    StructField("crawler_time",TimestampType(),True), \
    StructField("crawler_time_stamp",LongType(),True), \
    StructField("is_retweet",ByteType(),True), \
    StructField("user_id",LongType(),True), \
    StructField("nick_name",StringType(),True), \
    StructField("tou_xiang",StringType(),True), \
    StructField("user_type",StringType(),True), \
    StructField("weibo_id",LongType(),True), \
    StructField("weibo_content",StringType(),True), \
    StructField("zhuan",IntegerType(),True), \
    StructField("ping",IntegerType(),True), \
    StructField("zhan",IntegerType(),True), \
    StructField("url",StringType(),True), \
    StructField("device",StringType(),True), \
    StructField("locate",StringType(),True), \
    StructField("time",TimestampType(),True), \
    StructField("time_stamp",LongType(),True), \
    StructField("r_user_id",LongType(),True), \
    StructField("r_nick_name",StringType(),True), \
    StructField("r_user_type",StringType(),True), \
    StructField("r_weibo_id",LongType(),True), \
    StructField("r_weibo_content",StringType(),True), \
    StructField("r_zhuan",IntegerType(),True), \
    StructField("r_ping",IntegerType(),True), \
    StructField("r_zhan",IntegerType(),True), \
    StructField("r_url",StringType(),True), \
    StructField("r_device",StringType(),True), \
    StructField("r_location",StringType(),True), \
    StructField("r_time",TimestampType(),True), \
    StructField("r_time_stamp",LongType(),True), \
    StructField("pic_content",StringType(),True)
])

#读入微博数据
df = sqlContext.load(source="com.databricks.spark.csv", path = "/notebooks/weibo/weibo_freshdata.2016-05-01",schema = customSchema,delimiter="\t")

if df:
    print "df 创建成功"
#df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv")


df 创建成功


In [3]:
#选择数据
user = df[(df.time >= '2016-05-01 00:00:00') & (df.time <= '2016-05-01 03:00:00')]
print '已选择 %s 条数据...'%(user.count())
#分词
test = user.select('weibo_content').map(sp).collect()
#数词
rdd = sc.parallelize([(word,1) for line in test for word in line])
print '分成 %s 个词条...'%(rdd.count())

wcount = sc.parallelize(rdd.filter(lambda x: len(x[0]) >1).countByKey().items())
#排除停止词
wcount = wcount.subtractByKey(stopWordRDD)#.collect()

#排序
top100RDD = wcount.sortBy(lambda x: x[1],ascending=False)
top100 = top100RDD.collect()[:100]
#top100[:10]

#?????????

#关联词性
joinedRDD = top100RDD.leftOuterJoin(dictRDD)
joinedRDD.count()

已选择 269094 条数据...


TypeError: 'int' has no length

In [5]:
top100RDD2 = joinedRDD.sortBy(lambda x: x[1][0],ascending=False)
top1002 = top100RDD2.collect()[:100]


In [6]:
top100N = top100RDD2.filter(lambda x : x[1][1] is not None and x[1][1].startswith('nr')).collect()[:100]


In [7]:

plt.figure(figsize=(14,10))  

#wordcloud = WordCloud(font_path="/usr/local/lib/python2.7/dist-packages/wordcloud/yahei.ttf").fit_words([(word,freq) for word,(freq,_) in top100N])
wordcloud = WordCloud(max_font_size=40, relative_scaling=.5).fit_words([(word,freq) for word,(freq,_) in top100N])
plt.imshow(wordcloud)
plt.axis("off")

plt.show()


TypeError: 'numpy.ndarray' does not have the buffer interface

<matplotlib.figure.Figure at 0x7f8e7168>

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors
#导入数据
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
