In [1]:
#spark MLlib
#spark sql

from pyspark.sql import SparkSession


spark = SparkSession\
    .builder\
    .appName("example-spark")\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

In [9]:
agents = spark.read.json('..\data/agents.json')
agents

DataFrame[country_name: string, id: bigint, latitude: double, longitude: double, sex: string]

In [13]:
french_agents = agents.filter(agents.country_name == "France")
french_agents
french_agents.count()
agent = french_agents.first()
agent
print(agent.country_name, agent.id)
agents.filter(agents.country_name == "France").filter(agents.latitude < 0).count()
agents.limit(5).show()

France 5130782577
+------------+----------+------------------+------------------+------+
|country_name|        id|          latitude|         longitude|   sex|
+------------+----------+------------------+------------------+------+
|       China| 227417393| 33.15219798270325|100.85840672174572|  Male|
|       Haiti|6821129477|19.325567983697297|-72.43795260265814|Female|
|       India|2078667700|23.645271492037235| 80.85636526088884|Female|
|       China| 477556555| 33.45864668881662| 93.33604038078953|Female|
|       India|1379059984|28.816938290678692|  80.7728698035823|Female|
+------------+----------+------------------+------------------+------+



In [14]:
#sql
agents.createTempView("agents_table")
spark.sql("SELECT * FROM agents_table ORDER BY id DESC LIMIT 10").show()

+-----------------+----------+-------------------+-------------------+------+
|     country_name|        id|           latitude|          longitude|   sex|
+-----------------+----------+-------------------+-------------------+------+
| French Polynesia|7170821229|-15.004219445056265|-140.01650828107668|  Male|
|       Cabo Verde|7167692449|  16.00676587564149| -23.90898775675409|  Male|
|         Suriname|7166451460|  4.008871704322331| -55.97275746253122|Female|
|         Suriname|7166235088|   3.96442417744574|-56.077562332679605|Female|
|            Macau|7166034642| 21.944944804684596| 114.02447154998114|Female|
|       Montenegro|7164357515|  42.32131745506727| 19.168822000529843|  Male|
|Equatorial Guinea|7163867872|  3.651402073464487|  9.913739020397387|Female|
|           Bhutan|7163256789| 27.419739555133912|  90.29001406759927|Female|
|           Bhutan|7163004645| 27.281480489455422|  90.17405662751794|  Male|
|           Bhutan|7162877973|  27.37149433886258|  90.388829285

In [17]:
#persist pour eviter de recalculer
agents.persist()
agents.rdd.filter(lambda row: row.country_name == "France").count()

from pyspark.sql import Row

sc = spark.sparkContext
rdd = sc.parallelize([Row(name="Alice"), Row(name="Bob")])
spark.createDataFrame(rdd)

DataFrame[name: string]

In [19]:
from pyspark.sql import Row

#rdd=>df
def load_dataframe(path):
    rdd = sc.textFile(path)\
        .map(lambda line: line.split())\
        .map(lambda words: Row(label=words[0], words=words[1:]))
    return spark.createDataFrame(rdd)

train_data = load_dataframe("..\data/20ng-train-all-terms.txt")
test_data = load_dataframe("..\data/20ng-test-all-terms.txt")

In [20]:
#Vectorisation
from pyspark.ml.feature import CountVectorizer
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
#transformation
vectorizer_transformer = vectorizer.fit(train_data)

In [21]:
train_bag_of_words = vectorizer_transformer.transform(train_data)
test_bag_of_words = vectorizer_transformer.transform(test_data)

In [22]:
train_data.select("label").distinct().sort("label").show(truncate=False)

+------------------------+
|label                   |
+------------------------+
|alt.atheism             |
|comp.graphics           |
|comp.os.ms-windows.misc |
|comp.sys.ibm.pc.hardware|
|comp.sys.mac.hardware   |
|comp.windows.x          |
|misc.forsale            |
|rec.autos               |
|rec.motorcycles         |
|rec.sport.baseball      |
|rec.sport.hockey        |
|sci.crypt               |
|sci.electronics         |
|sci.med                 |
|sci.space               |
|soc.religion.christian  |
|talk.politics.guns      |
|talk.politics.mideast   |
|talk.politics.misc      |
|talk.religion.misc      |
+------------------------+



In [23]:
#ML
from pyspark.ml.feature import StringIndexer
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
label_indexer_transformer = label_indexer.fit(train_bag_of_words)
#ML:train
train_bag_of_words = label_indexer_transformer.transform(train_bag_of_words)
#ML:test
test_bag_of_words = label_indexer_transformer.transform(test_bag_of_words)

In [24]:
#ML:learn Bayes
from pyspark.ml.classification import NaiveBayes
classifier = NaiveBayes(
    labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted"
)
classifier_transformer = classifier.fit(train_bag_of_words)


In [25]:
#ML:predict
test_predicted = classifier_transformer.transform(test_bag_of_words)
test_predicted.select("label_index", "label_index_predicted").limit(10).show()


+-----------+---------------------+
|label_index|label_index_predicted|
+-----------+---------------------+
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 19.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
|       17.0|                 17.0|
+-----------+---------------------+



In [26]:
#ML:learn multiclass
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy")
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))

Accuracy = 0.80


In [28]:
#pipelines
from pyspark.ml import Pipeline

vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
classifier = NaiveBayes(
    labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted",
)
pipeline = Pipeline(stages=[vectorizer, label_indexer, classifier])
pipeline_model = pipeline.fit(train_data)

test_predicted = pipeline_model.transform(test_data)