# Combine


In [1]:
df = sqlContext.createDataFrame([
    (1, 'a'), (2, 'a'),
    (3, 'b'), (4, 'b'),
    (5, 'c'), (6, 'c'),
    (7, 'd'), (8, 'd'),
], schema=['value', 'name'])

xf = df.select(df["name"].alias("nam"), df["value"].alias("val"))
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"])

from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features")
selected_features = assembler.transform(pf).select('features')
selected_features.collect()

[Row(features=DenseVector([8.0, 7.0])),
 Row(features=DenseVector([6.0, 5.0])),
 Row(features=DenseVector([4.0, 3.0])),
 Row(features=DenseVector([2.0, 1.0]))]

In [2]:
pf.show()


+-----+---+----+
|value|val|name|
+-----+---+----+
|    8|  7|   d|
|    6|  5|   c|
|    4|  3|   b|
|    2|  1|   a|
+-----+---+----+



In [4]:
df = sqlContext.createDataFrame([
    ('a', ('f1', 1)), ('a', ('f2', 2)),
    ('b', ('f1', 3)), ('b', ('f2', 4)),
    ('c', ('f1', 5)), ('c', ('f2', 6)),
    ('d', ('f1', 7)), ('d', ('f2', 8)),
], schema=['name', 'feature'])

import pyspark.sql.functions as F

df.groupBy('name').agg(F.collect_list('feature')).show()

+----+---------------------+
|name|collect_list(feature)|
+----+---------------------+
|   d|     [[f1,7], [f2,8]]|
|   c|     [[f1,5], [f2,6]]|
|   b|     [[f1,3], [f2,4]]|
|   a|     [[f1,1], [f2,2]]|
+----+---------------------+



In [6]:
df = sqlContext.createDataFrame([
    (1, 'a'), (2, 'a'),
    (3, 'b'), (4, 'b'),
    (5, 'c'), (6, 'c'),
    (7, 'd'), (8, 'd'),
    (7, 'd'), (8, 'd'),
], schema=['value', 'name'])

import pyspark.sql.functions as F

df.groupBy('name').agg(F.collect_list('value')).show()

+----+-------------------+
|name|collect_list(value)|
+----+-------------------+
|   d|       [7, 8, 7, 8]|
|   c|             [5, 6]|
|   b|             [3, 4]|
|   a|             [1, 2]|
+----+-------------------+



# Join

In [1]:
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])

c = a.join(b, a.a_id == b.b_id) #on = 'id'
c.show()

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   c|  haw|   p3|   c|
|   b|  hem|   p2|   b|
|   a|  foo|   p1|   a|
+----+-----+-----+----+



# convert pyspark dataframe column from list to string

In [1]:
# from pyspark.sql.types import StructType, StructField
# schema = StructType([StructField("uuid",IntegerType(),True),StructField("test_123",ArrayType(StringType(),True),True)])
# rdd = sc.parallelize([[1, ["test","test2","test3"]], [2, ["test4","test","test6"]],[3,["test6","test9","t55o"]]])
# df = spark.createDataFrame(rdd, schema)

# df.show()


In [1]:
# from pyspark.sql.functions import udf, col

# join_udf = udf(lambda x: ",".join(x))
# df.withColumn("test_123", join_udf(col("test_123"))).show()


# convert rdd from list to string

# making our own sentiment classifier

In [17]:
originData=sc.textFile('douban_movie.txt')
originDistinctData=originData.distinct()
rateDocument=originDistinctData.map(lambda line : line.split('\t')).\
filter(lambda line : len(line)==2)
fiveRateDocument=rateDocument.filter(lambda line : int(line[0])==5)
print(fiveRateDocument.count())
fourRateDocument=rateDocument.filter(lambda line : int(line[0])==4)
print(fourRateDocument.count())
threeRateDocument=rateDocument.filter(lambda line : int(line[0])==3)
print(threeRateDocument.count())
twoRateDocument=rateDocument.filter(lambda line : int(line[0])==2)
print(twoRateDocument.count())
oneRateDocument=rateDocument.filter(lambda line : int(line[0])==1)
print(oneRateDocument.count())


727
1132
737
195
90


In [22]:
from pyspark.mllib.regression import LabeledPoint
def ReduceRate(a):
    if int(a) == 5 or int(a) == 4:
        return 1
    else:
        return 0
negRateDocument=oneRateDocument.union(twoRateDocument).union(threeRateDocument)
# negRateDocument.repartition(1)
positiveRateDocument=fiveRateDocument.union(fourRateDocument)
posRateDocument=sc.parallelize(positiveRateDocument.take(negRateDocument.count())) #.repartition(1)
allRateDocument=negRateDocument.union(posRateDocument)
# allRateDocument.repartition(1)


In [33]:
rate=allRateDocument.map(lambda s : ReduceRate(s[0]))
document=allRateDocument.map(lambda s: s[1])
# pip install jieba; conda install -c conda-forge jieba
import jieba
words=document.map(lambda w:"/".\
    join(jieba.cut_for_search(w))).\
    map(lambda line: line.split("/"))
print(words)
from pyspark.mllib.feature import HashingTF, IDF
hashingTF = HashingTF()
tf = hashingTF.transform(words)
tf.cache()
# 计算 TF-IDF 矩阵
idfModel = IDF().fit(tf)
tfidf = idfModel.transform(tf)
zipped=rate.zip(tfidf)
data=zipped.map(lambda line:LabeledPoint(line[0],line[1]))
training, test = data.randomSplit([0.6, 0.4], seed = 0)
from pyspark.mllib.classification import NaiveBayes
# NBmodel = NaiveBayes.train(training, 1.0)
NBmodel = NaiveBayes.train(training, 1.0)
predictionAndLabel = test.map(lambda p : (NBmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 \
    if x[0] == x[1] else 0.0).count() / test.count()

PythonRDD[221] at RDD at PythonRDD.scala:48


In [38]:
#文本分类器
yourDocument=input("输入待分类的评论：")
yourwords="/".join(jieba.cut_for_search(yourDocument)).split("/")
yourtf = hashingTF.transform(yourwords)
yourtfidf=idfModel.transform(yourtf)
print('NaiveBayes Model Predict:',NBmodel.predict(yourtfidf))

输入待分类的评论：太精彩了
NaiveBayes Model Predict: 1.0


# 用该分类器给评论打分的

In [42]:
def Chinese_sen_classifier_Bayes(yourDocument):
    yourwords="/".join(jieba.cut_for_search(yourDocument)).split("/")
    yourtf = hashingTF.transform(yourwords)
    yourtfidf=idfModel.transform(yourtf)
    return NBmodel.predict(yourtfidf)

In [43]:
Chinese_sen_classifier_Bayes("太精彩了")

1.0

# classifer改进

In [47]:
#数据预处理中去掉停用词
from pyspark.mllib.classification import SVMWithSGD, SVMModel
text=words.flatMap(lambda w:w)
wordCounts = text.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b).\
sortBy(lambda x: x[1],ascending=False)
wordCounts.take(10)
def filterStopWords(line):
    for i in line:
        if i in stopwords:
            line.remove(i)
    return line
words=words.map(lambda w : filterStopWords(w))

#全分词模式
words=document.map(lambda w:"/".join(jieba.\
cut(w, cut_all=True))).\
map(lambda line: line.split("/"))
#用SVM
SVMmodel = SVMWithSGD.train(training, iterations=100)
predictionAndLabel = test.map(lambda p : (SVMmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 815.0 failed 1 times, most recent failure: Lost task 6.0 in stage 815.0 (TID 2419, localhost, executor driver): org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker did not connect back in time
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133)
	... 24 more


# add Column

# multiple lists to DataFrame

In [None]:

import pandas as pd
lst1 = range(100)
lst2 = range(100)
lst3 = range(100)
percentile_list = pd.DataFrame(
    {'lst1Tite': lst1,
     'lst2Tite': lst2,
     'lst3Tite': lst3
    })
# training = sqlContext.createDataFrame(pdf)['view','sum_score','avg_score','var_score']
# from pandas(pdf) to spark df

# RDD to List(DF toList)

In [None]:
video_codeList = ReplyDF.select("video_code").rdd.flatMap(lambda x: x).collect()

# sqlfunctions

In [None]:
# pyspark.sql.functions