In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("AuthorshipIdentificationWithPyspark").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "2g").\
        getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/11 14:53:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sc.setLogLevel("WARN")

In [3]:
spark.conf.set("spark.dag.scheduler.broadcast.largeTasks.maxSize", "4 MiB")

In [4]:
# !pip install nltk

In [5]:
# import libraries 
import os 
import pandas as pd 
import numpy as np
import re

In [6]:
# !pip install pdfplumber 

In [7]:
import pdfplumber

### process the data for balzac 

In [8]:
def preprocess_text(text):
  
    # Remove digits
    text = re.sub('\d', '', text)
    
    # Remove punctuation
    text = re.sub(r'[^\w\s]', '', text)
    
    # Convert to lowercase
    text = text.lower()
       
    return text

In [9]:
def split_text(text, chunk_size):
    """Splits the text into chunks of a specified size.

    Args:
        text: The text to be split.
        chunk_size: The desired size of each text chunk.

    Returns:
        A list of text chunks.
    """
    
    chunks = []
    
    current_chunk = ''
    
    words = text.split()
    
    for word in words:
        
        if len(current_chunk) + len(word) + 1 <= chunk_size:
            current_chunk += word + ' '
        
        else:
            chunks.append(current_chunk.strip())
            current_chunk = word + ' '
    
    if current_chunk:
        chunks.append(current_chunk.strip())
    
    return chunks  

In [14]:
# create text files from pdf files 

folders = ["balzac", "flaubert"]
chunk_size = 100

rows = []
for folder in folders:
    for file in os.listdir(folder):
#       process pdf files
        if file.endswith('.pdf'):
            with pdfplumber.open(f'{folder}/{file}') as pdf:

                for page in pdf.pages[3::]:
                    text = page.extract_text()
                    text = preprocess_text(text)

                    chunks = split_text(text, chunk_size)

                    for chunk in chunks:
                        rows.append({
                            'filename': file,
                            'text': chunk,
                            'author': folder
                        })

#       process text files        
        if file.endswith(".txt"):
                with open(f"{folder}/{file}", "r") as txt:

                    text = txt.read()    

                    text = preprocess_text(text)

                    chunks = split_text(text, chunk_size)

                    for chunk in chunks:
                        rows.append({
                            'filename': file,
                            'text': chunk,
                            'author': folder
                    })
                  
data = pd.DataFrame(rows)

In [10]:
data = pd.read_csv("data.csv")

In [11]:
data.head()

Unnamed: 0,filename,text,author
0,Balzac-La-Vieille-Fille-FrenchPDF.pdf,french accessibilité disponibilité et gratuité...,balzac
1,Balzac-La-Vieille-Fille-FrenchPDF.pdf,balzac genre nouvelles contes numéro livre www...,balzac
2,Balzac-La-Vieille-Fille-FrenchPDF.pdf,pinterestcomfrenchpdf note ce livre numérique ...,balzac
3,Balzac-La-Vieille-Fille-FrenchPDF.pdf,utilisation strictement personnelle et ne peut...,balzac
4,Balzac-La-Vieille-Fille-FrenchPDF.pdf,la vieille fille à monsieur eugèneauguste geor...,balzac


In [12]:
# save preprocessed data
# data.to_csv("data.csv", index=False)

In [13]:
data.author.value_counts()

balzac      155406
flaubert     34933
Name: author, dtype: int64

it appears that we have enough data for this work

In [14]:
df = spark.read\
    .format("csv")\
    .option("multiline", True)\
    .option("header", "true")\
    .load("data.csv")\

df.select("*").show(n=3)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------+
|            filename|                text|author|
+--------------------+--------------------+------+
|Balzac-La-Vieille...|french accessibil...|balzac|
|Balzac-La-Vieille...|balzac genre nouv...|balzac|
|Balzac-La-Vieille...|pinterestcomfrenc...|balzac|
+--------------------+--------------------+------+
only showing top 3 rows



                                                                                

In [15]:
df.select("author").show(n=3)

+------+
|author|
+------+
|balzac|
|balzac|
|balzac|
+------+
only showing top 3 rows



In [16]:
df.columns

['filename', 'text', 'author']

In [17]:
df = df.select("text", "author")

In [18]:
df.select("author").distinct().show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+
|  author|
+--------+
|flaubert|
|  balzac|
+--------+



                                                                                

In [19]:
df = df.dropna()

In [20]:
df.show(n=3)

+--------------------+------+
|                text|author|
+--------------------+------+
|french accessibil...|balzac|
|balzac genre nouv...|balzac|
|pinterestcomfrenc...|balzac|
+--------------------+------+
only showing top 3 rows



In [21]:
df.groupBy("author").count().show()

[Stage 7:>                                                          (0 + 1) / 1]

+--------+------+
|  author| count|
+--------+------+
|flaubert| 34933|
|  balzac|155406|
+--------+------+



                                                                                

In [22]:
import pyspark.ml.feature

In [23]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, IDF, CountVectorizer
from pyspark.ml.feature import StringIndexer 

In [24]:
tokenizer = Tokenizer(inputCol="text", outputCol="mytokens")
stopWordsRemover = StopWordsRemover(inputCol="mytokens", outputCol="filtered_tokens")
vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="vectorizedFeatures")

In [25]:
labelEncoder = StringIndexer(inputCol="author", outputCol="label").fit(df)

                                                                                

In [26]:
df = labelEncoder.transform(df)

In [27]:
(trainDF, testDF) = df.randomSplit((0.6, 0.4), seed=42)

In [28]:
trainDF.show() 

[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+--------+-----+
|                text|  author|label|
+--------------------+--------+-----+
|_________________...|  balzac|  0.0|
|_affaire_ ne donn...|  balzac|  0.0|
|_al_ pour elle un...|flaubert|  1.0|
|_aliénistes_ le s...|  balzac|  0.0|
|_appel par métivi...|  balzac|  0.0|
|_athalie_ et débi...|flaubert|  1.0|
|_authentique_ tom...|  balzac|  0.0|
|_baby_ crie et qu...|  balzac|  0.0|
|_barrows_ et les ...|flaubert|  1.0|
|_belle position_ ...|  balzac|  0.0|
|_bene sit_ je nai...|  balzac|  0.0|
|_benedicite_ sera...|  balzac|  0.0|
|_bien_ le jeune m...|  balzac|  0.0|
|_blagué_ mais je ...|  balzac|  0.0|
|_bonaparte_ on a ...|flaubert|  1.0|
|_bourguignon_ est...|  balzac|  0.0|
|_bovary_ la occup...|flaubert|  1.0|
|_brouetter le mon...|  balzac|  0.0|
|_broumbroumant_ e...|  balzac|  0.0|
|_broussonatia_ sa...|  balzac|  0.0|
+--------------------+--------+-----+
only showing top 20 rows



                                                                                

In [29]:
testDF.show()

+--------------------+--------+-----+
|                text|  author|label|
+--------------------+--------+-----+
|_ mai_ _procèsver...|  balzac|  0.0|
|_abyssus abyssum_...|  balzac|  0.0|
|_ahriman_ le prin...|flaubert|  1.0|
|_alberto_ nous pr...|  balzac|  0.0|
|_anglaises_ situé...|  balzac|  0.0|
|_antiquités_ coût...|  balzac|  0.0|
|_après avoir rega...|  balzac|  0.0|
|_attendu_ qui rés...|  balzac|  0.0|
|_attraper leurs p...|  balzac|  0.0|
|_auffredi_ de la ...|  balzac|  0.0|
|_avait donné ses ...|  balzac|  0.0|
|_banqueroutier_ f...|  balzac|  0.0|
|_belle hollandais...|  balzac|  0.0|
|_bellepoule_ pour...|  balzac|  0.0|
|_beltà folgorante...|  balzac|  0.0|
|_bien_ mais pour ...|  balzac|  0.0|
|_bienheureux les ...|  balzac|  0.0|
|_biophilie_ manif...|flaubert|  1.0|
|_bocal aux grands...|  balzac|  0.0|
|_bouillard_ espèc...|  balzac|  0.0|
+--------------------+--------+-----+
only showing top 20 rows



                                                                                

In [30]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier

In [31]:
lr = LogisticRegression(featuresCol="vectorizedFeatures", labelCol="label")

### Building Pipeline 

In [32]:
from pyspark.ml import Pipeline

In [33]:
pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, vectorizer, idf, lr])

In [34]:
pipeline

Pipeline_a69492c1a57a

In [35]:
pipeline.stages

Param(parent='Pipeline_a69492c1a57a', name='stages', doc='a list of pipeline stages')

In [36]:
lr_model = pipeline.fit(trainDF)

                                                                                

23/06/11 14:54:02 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

23/06/11 14:54:06 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


[Stage 21:>                                                         (0 + 1) / 1]

23/06/11 14:54:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/11 14:54:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/06/11 14:54:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/11 14:54:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/06/11 14:54:09 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

23/06/11 14:54:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:10 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:12 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:12 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/06/11 14:54:12 WARN DAGScheduler: Broadcasting larg

In [37]:
lr_model

PipelineModel_e904f258514f

In [38]:
predictions = lr_model.transform(testDF) 

In [63]:
predictions.columns

['text',
 'author',
 'label',
 'mytokens',
 'filtered_tokens',
 'rawFeatures',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [64]:
predictions.select("text", "author", "probability", "prediction").show()

23/06/11 15:04:19 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
+--------------------+--------+--------------------+----------+
|                text|  author|         probability|prediction|
+--------------------+--------+--------------------+----------+
|_ mai_ _procèsver...|  balzac|[0.99999914546493...|       0.0|
|_abyssus abyssum_...|  balzac|[0.99999999988042...|       0.0|
|_ahriman_ le prin...|flaubert|[0.99023021493675...|       0.0|
|_alberto_ nous pr...|  balzac|[0.99999999999999...|       0.0|
|_anglaises_ situé...|  balzac|[0.99999999999998...|       0.0|
|_antiquités_ coût...|  balzac|           [1.0,0.0]|       0.0|
|_après avoir rega...|  balzac|[0.99999999999312...|       0.0|
|_attendu_ qui rés...|  balzac|           [1.0,0.0]|       0.0|
|_attraper leurs p...|  balzac|[0.99777527703001...|       0.0|
|_auffredi_ de la ...|  balzac|[0.99999999999905...|       0.0|
|_avait donné ses ...|  balzac|[0.99999999983479...|       0.0|
|_banqueroutier_ f

                                                                                

In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [65]:
testDF.show(n=20)

+--------------------+--------+-----+
|                text|  author|label|
+--------------------+--------+-----+
|_ mai_ _procèsver...|  balzac|  0.0|
|_abyssus abyssum_...|  balzac|  0.0|
|_ahriman_ le prin...|flaubert|  1.0|
|_alberto_ nous pr...|  balzac|  0.0|
|_anglaises_ situé...|  balzac|  0.0|
|_antiquités_ coût...|  balzac|  0.0|
|_après avoir rega...|  balzac|  0.0|
|_attendu_ qui rés...|  balzac|  0.0|
|_attraper leurs p...|  balzac|  0.0|
|_auffredi_ de la ...|  balzac|  0.0|
|_avait donné ses ...|  balzac|  0.0|
|_banqueroutier_ f...|  balzac|  0.0|
|_belle hollandais...|  balzac|  0.0|
|_bellepoule_ pour...|  balzac|  0.0|
|_beltà folgorante...|  balzac|  0.0|
|_bien_ mais pour ...|  balzac|  0.0|
|_bienheureux les ...|  balzac|  0.0|
|_biophilie_ manif...|flaubert|  1.0|
|_bocal aux grands...|  balzac|  0.0|
|_bouillard_ espèc...|  balzac|  0.0|
+--------------------+--------+-----+
only showing top 20 rows



In [43]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [44]:
accuracy = evaluator.evaluate(predictions)

23/06/11 14:54:28 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

In [45]:
accuracy

0.8663900961411827

here we got the accuracy of 0.86 which is super good for this kind of problems


### Test With new Data

In [101]:

flaubert_text = """Madame Bovary, mœurs de province est un roman de Gustave Flaubert paru en 1857. Il est considéré comme l'un des plus grands romans de la littérature française.

L'histoire se déroule dans la petite ville de Yonville-l'Abbaye, en Normandie, au début du XIXe siècle. Charles Bovary, un jeune médecin peu ambitieux, épouse Emma Rouault, une jeune femme romantique et idéaliste. Emma, qui a reçu une éducation sentimentale romanesque, est vite déçue par la vie de province et par son mari, qu'elle trouve ennuyeux et peu cultivé. Elle se prend de passion pour Rodolphe Boulanger, un riche agriculteur, puis pour Léon Dupuis, un jeune avocat. Ses liaisons adultères la mènent à la ruine financière et morale. Elle finit par se suicider en avalant de l'arsenic.

Madame Bovary est un roman réaliste qui montre la réalité de la vie de province au XIXe siècle. Il est aussi un roman psychologique qui montre la complexité de la personnalité d'Emma Bovary. Le roman a été condamné à l'époque pour son réalisme et son immoralité. Il est aujourd'hui considéré comme un chef-d'œuvre de la littérature française.
"""

balzac_text = """Le soir, à l'heure où le monde se déshabille et se met en négligé, où les vanités tombent une à une, comme les feux d'une illuminée, l'homme à projets, l'homme aux espérances trouva ce soir-là ses plans embrumés, ses mirages anéantis, son futur envolé. Il était brisé, épuisé, étourdi ; il se trouva pour la première fois au fond du précipice où tombe tout homme qui en rencontre un autre plus puissant que lui et qui le veut détruire. Il avait l'expérience de la défaite ; il ne la concevait pas encore. La vie, si facile quelques heures auparavant, lui apparut dès lors comme une lutte incessante avec des hommes et des choses, une bataille où chacun porte une arme, et où il faut regarder en même temps les ennemis et les alliés, surveiller toutes les actions, et même les plus indifférentes, car une circonstance imprévue peut révéler une trahison."""

rows = [(flaubert_text, ),(balzac_text, )]

new_entry = spark.createDataFrame(rows, ("text", ))

prediction = lr_model.transform(new_entry)

for row in prediction.select("prediction").collect():
    if  row.prediction==1.0:
        print("text of flaubert")
    elif  row.prediction==0.0:
        print("text of blazac")


23/06/11 15:18:06 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
text of flaubert
text of blazac


### Fin de notebook