In [29]:

from pyspark.sql.types import ArrayType, StringType

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, year, avg, length, desc, split, concat_ws, count, isnull, size, udf

# Créer une session Spark avec le connecteur MongoDB
spark = SparkSession.builder \
    .appName("Analyse des données MongoDB") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Charger les données depuis MongoDB en tant que DataFrame Spark
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Distribution des brevets par année de publication
distribution_par_annee = df.groupBy(year("publication_date").alias("year")).count().orderBy("year")

# Distribution des brevets par pays
distribution_par_pays = df.groupBy("country").count().orderBy(desc("count"))

# Distribution des brevets par inventeur ou titulaire actuel
distribution_par_inventeur = df.select(explode("current_assignees").alias("assignee")).groupBy("assignee").count().orderBy(desc("count"))

# Distribution des brevets par langue
distribution_par_langue = df.groupBy("other_language").count().orderBy(desc("count"))

# Concaténation des descriptions en une seule chaîne de caractères
concat_descriptions = df.select(concat_ws(" ", "description").alias("concat_description"))

# Tokenisation des descriptions
mots_descriptions = concat_descriptions.select(explode(split("concat_description", "\\s+")).alias("mot")).filter(col("mot") != "")

# Concaténation des revendications en une seule chaîne de caractères
concat_revendications = df.select(concat_ws(" ", "claims").alias("concat_claims"))

# Tokenisation des revendications
mots_revendications = concat_revendications.select(explode(split("concat_claims", "\\s+")).alias("mot")).filter(col("mot") != "")

# Analyse de la longueur moyenne des descriptions et des revendications
longueur_moyenne_description = concat_descriptions.select(avg(length("concat_description")).alias("avg_description_length"))
longueur_moyenne_revendications = concat_revendications.select(avg(length("concat_claims")).alias("avg_claims_length"))

# Calcul de la fréquence des mots dans les descriptions
freq_mots_descriptions = mots_descriptions.groupBy("mot").count().orderBy(desc("count"))

# Calcul de la fréquence des mots dans les revendications
freq_mots_revendications = mots_revendications.groupBy("mot").count().orderBy(desc("count"))

# Distribution des brevets par inventeur
distribution_par_inventeur = df.select(explode("inventors").alias("inventor")) \
                                .groupBy("inventor").agg(count("*").alias("nombre_brevets")) \
                                .orderBy(col("nombre_brevets").asc())

# Distribution des brevets par date de publication
distribution_par_date_publication = df.groupBy("publication_date").count().orderBy("publication_date")


# Affichage des résultats
print("Distribution des brevets par inventeur :")
distribution_par_inventeur.show(truncate=False)


# Affichage des résultats
print("Distribution des brevets par année de publication :")
distribution_par_annee.show(truncate=False)

print("Distribution des brevets par pays :")
distribution_par_pays.show(truncate=False)

print("Distribution des brevets par inventeur ou titulaire actuel :")
distribution_par_inventeur.show(truncate=False)

print("Distribution des brevets par langue :")
distribution_par_langue.show(truncate=False)

print("Longueur moyenne des descriptions :")
longueur_moyenne_description.show()

print("Longueur moyenne des revendications :")
longueur_moyenne_revendications.show()

print("Mots les plus fréquents dans les descriptions :")
freq_mots_descriptions.show(truncate=False)

print("Mots les plus fréquents dans les revendications :")
freq_mots_revendications.show(truncate=False)

# Distribution des brevets par pays d'origine et année de publication
distribution_par_pays_et_annee = df.groupBy("country", year("publication_date").alias("year")).count().orderBy("country", "year")

# Affichage des résultats
print("Distribution des brevets par pays d'origine et année de publication :")
distribution_par_pays_et_annee.show(truncate=False)

# Explode des revendications pour obtenir une ligne par revendication
df_exploded = df.select("country", explode("claims").alias("claim"))

# Distribution des brevets par pays et nombre de revendications
distribution_par_pays_et_revendications = df_exploded.groupBy("country").count().orderBy(col("count").desc())

# Affichage des résultats
print("Distribution des brevets par pays et nombre de revendications :")
distribution_par_pays_et_revendications.show(truncate=False)

Distribution des brevets par inventeur :
+--------------------------+--------------+
|inventor                  |nombre_brevets|
+--------------------------+--------------+
|Alison Sunstrum           |1             |
|Yosef YOVEL               |1             |
|Thierry Michelon          |1             |
|Arno Ruckelshausen        |1             |
|Frank Bernard             |1             |
|Roee FINKELSHTAIN         |1             |
|Gregory Francis Chakwan Lu|1             |
|Frederic Bataille         |1             |
|Salah Sukkarieh           |1             |
|Alfred C. Dadson          |1             |
|Oddgeir HUSØY             |1             |
|Gabor Kosa                |1             |
|Martin LARSÉN             |1             |
|Jon Friedman              |1             |
|Julien Mercier            |1             |
|Andreas Linz              |1             |
|Jonathan BINNEY           |1             |
|Bernard Certain           |1             |
|Jung Youl Park            |1      

In [3]:
from difflib import SequenceMatcher
from pyspark.sql.functions import col

# Fonction pour calculer le pourcentage de similarité entre deux chaînes de caractères
def similarite(titre1, titre2):
    return SequenceMatcher(None, titre1, titre2).ratio() * 100

# Fonction pour calculer la similarité entre tous les titres non vides dans le DataFrame
def calculer_similarite_entre_titres(df):
    similarites = []
    titres_non_vides = df.filter(col("title").isNotNull()).select("title").collect()
    for i in range(len(titres_non_vides)):
        titre1 = titres_non_vides[i]["title"].lower()
        for j in range(i+1, len(titres_non_vides)):
            titre2 = titres_non_vides[j]["title"].lower()
            # Vérifier si les titres ne sont pas vides et que la similarité n'est pas de 100%
            if titre1 and titre2 and titre1 != titre2:
                pourcentage_similarite = similarite(titre1, titre2)
                similarites.append((titre1, titre2, pourcentage_similarite))
    return similarites

# Calculer les similarités entre tous les titres non vides
similarites_titres = calculer_similarite_entre_titres(df)

# Filtrer les similarités pour ne garder que celles qui ont une similarité inférieure à 60%
similarites_sous_60 = [sim for sim in similarites_titres if sim[2] < 60]

# Trier les similarités filtrées par ordre décroissant de similarité
top_similarites_sous_60 = sorted(similarites_sous_60, key=lambda x: x[2], reverse=True)[:100]

# Afficher les 20 meilleures paires de titres avec une similarité inférieure à 60%
for titre1, titre2, pourcentage_similarite in top_similarites_sous_60:
    print(f"Titre 1 : {titre1}, Titre 2 : {titre2}, Pourcentage de similarité : {pourcentage_similarite}%")


Titre 1 : a method for obtaining data from an image of an object of a user that has a biometric characteristic of the user, Titre 2 : method for verifying the identity of a user by identifying an object within an image that has a biometric characteristic of the user and mobile device for executing the method, Pourcentage de similarité : 59.93031358885017%
Titre 1 : dispositif de pulverisation de produits de traitement pour vegetaux, Titre 2 : dispositif pour l'application de micro-ondes en vue du traitement d'un materiau., Pourcentage de similarité : 59.863945578231295%
Titre 1 : method and apparatus for packing bunches of flowers into sleeves, Titre 2 : the method and apparatus for determining the number of living cells in a test fluid, Pourcentage de similarité : 59.863945578231295%
Titre 1 : composition comprenant du henne et/ou de l'indigo, une huile et un saccharide, et procede de coloration capillaire la mettant en œuvre, Titre 2 : composition a base de poudres de plante(s) indig

In [4]:
df_filtre = df.filter(col("country").isNotNull() & col("publication_date").isNotNull())

# Distribution des brevets par pays, année de publication et inventeur
distribution_par_pays_annee_inventeur = df_filtre.select(
    "country",
    year("publication_date").alias("year"),
    explode("inventors").alias("inventor")
).groupBy("country", "year", "inventor").count().orderBy("country", "year", desc("count"))

# Affichage des résultats
distribution_par_pays_annee_inventeur.show(truncate=False)

+---------+----+--------------------+-----+
|country  |year|inventor            |count|
+---------+----+--------------------+-----+
|Australia|2006|Wolfgang Rauscher   |1    |
|Australia|2006|Michael Hodges      |1    |
|Australia|2006|Thomas Giering      |1    |
|Australia|2006|Oliver Martin       |1    |
|Australia|2006|Graham Butler       |1    |
|Australia|2006|Joachim Voelkening  |1    |
|Australia|2006|Lysis Cubieres      |1    |
|Australia|2006|Gerhard Schwenk     |1    |
|Australia|2006|Yannick Mechine     |1    |
|Australia|2006|Nicholas John Gudde |1    |
|Australia|2007|Mircea Dan Bucevschi|1    |
|Australia|2007|Tim Ulmasov         |1    |
|Australia|2007|Neal A. Bringe      |1    |
|Australia|2007|Joanne J. Fillatti  |1    |
|Australia|2007|Toni Voelker        |1    |
|Australia|2007|Monica Colt         |1    |
|Australia|2007|Mendy Axlerad       |1    |
|Australia|2008|Steve Verdino       |1    |
|Australia|2008|John I. Compton     |1    |
|Australia|2008|Mark E. Peters  

In [5]:
from pyspark.sql.functions import asc

# Distribution des brevets par pays, langue et inventeur
distribution_par_pays_langue_inventeur = df_filtre.select(
    "country",
    "other_language",
    explode("inventors").alias("inventor")
).groupBy("country", "other_language", "inventor").count().orderBy("country", asc("count"))

# Affichage des résultats
distribution_par_pays_langue_inventeur.show(truncate=False)


+---------+------------------+----------------------------+-----+
|country  |other_language    |inventor                    |count|
+---------+------------------+----------------------------+-----+
|Australia|Helen Dacres      |Nam Cao Hoai Le             |1    |
|Australia|Christopher Cooper|Thomas Craig Masterman      |1    |
|Australia|Helen Dacres      |Nan Wu                      |1    |
|Australia|Dieter H. Klaubert|James Unch                  |1    |
|Australia|Dieter H. Klaubert|Poncho Meisenheimer         |1    |
|Australia|Christopher Cooper|Marshall Medoff             |1    |
|Australia|Stephen Ecob      |Stephen Ecob                |1    |
|Australia|Helen Dacres      |Helen Dacres                |1    |
|Australia|James M. Brennan  |Christopher Michael Mcginnis|1    |
|Australia|James M. Brennan  |James M. Brennan            |1    |
|Australia|Taras WANKEWYCZ   |Taras WANKEWYCZ             |1    |
|Australia|Stephen Ecob      |Ian Oliver                  |1    |
|Australia

In [6]:
from pyspark.sql.functions import year

# Filtrer les lignes avec des valeurs non nulles dans les colonnes "country" et "publication_date"
df_filtre = df.filter(col("country").isNotNull() & col("publication_date").isNotNull())

# Distribution des brevets par pays, année de publication et titre
distribution_par_pays_annee_titre = df_filtre.select(
    "country",
    year("publication_date").alias("year"),
    "title"
).groupBy("country", "year", "title").count().orderBy("country", "year", desc("count"))

# Affichage des résultats
distribution_par_pays_annee_titre.show(truncate=False)


+---------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|country  |year|title                                                                                                                                                |count|
+---------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Australia|2006|Portable apparatus for analysis of a refinery feedstock or a product of a refinery process                                                           |1    |
|Australia|2006|Value document with luminescent properties                                                                                                           |1    |
|Australia|2007|Nucleic acid constructs and methods for producing altered seed oil compositions                                        

In [19]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# Connexion à la base de données PostgreSQL
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="PostgreSQL_16",
    user="postgres",
    password="12345678"
)

# Créer un moteur SQLAlchemy pour pandas
engine = create_engine('postgresql://postgres:12345678@localhost:5432/PostgreSQL_16')

# Dictionnaire contenant les DataFrames Spark
dataframes_spark = {
    'Distribution_par_annee': distribution_par_annee,
    'Distribution_par_pays': distribution_par_pays,
    'Distribution_par_inventeur': distribution_par_inventeur,
    'Distribution_par_langue': distribution_par_langue,
    'Longueur_moyenne_description': longueur_moyenne_description,
    'Longueur_moyenne_revendications': longueur_moyenne_revendications,
    'Freq_mots_descriptions': freq_mots_descriptions,
    'Freq_mots_revendications': freq_mots_revendications,
    'Distribution_par_pays_et_annee': distribution_par_pays_et_annee,
    'Distribution_par_pays_et_revendications': distribution_par_pays_et_revendications,
    'Distribution_par_pays_annee_inventeur': distribution_par_pays_annee_inventeur,
    'Distribution_par_pays_langue_inventeur': distribution_par_pays_langue_inventeur,
    'Distribution_par_pays_annee_titre': distribution_par_pays_annee_titre
}

# Convertir les DataFrames Spark en DataFrames pandas et les insérer dans PostgreSQL
for df_name, df_spark in dataframes_spark.items():
    df_pandas = df_spark.toPandas()
    df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)

# Fermer la connexion PostgreSQL
conn.close()


In [26]:
import psycopg2
from psycopg2 import sql

# Connexion à la base de données
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="PostgreSQL_16",
    user="postgres",
    password="12345678"
)

# Création d'un curseur
cur = conn.cursor()

# Récupération de la liste des tables dans la base de données
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cur.fetchall()

# Parcours de chaque table et export des données
for table in tables:
    table_name = table[0]
    # Récupération des données de la table
    cur.execute(sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)))
    data = cur.fetchall()
    
    # Écriture des données dans le fichier CSV avec l'encodage UTF-8
    with open(f"{table_name}.csv", "w", encoding="utf-8") as f:
        # Écriture de l'en-tête du fichier CSV avec les noms des colonnes
        cur.execute(sql.SQL("SELECT column_name FROM information_schema.columns WHERE table_name = %s"), (table_name,))
        columns = [col[0] for col in cur.fetchall()]
        f.write(','.join(columns) + '\n')
        
        # Écriture des données dans le fichier CSV
        for row in data:
            # Convertir chaque élément de la ligne en chaîne et le protéger contre les caractères spéciaux
            row_str = [str(elem).replace(',', '') if elem is not None else '' for elem in row]
            f.write(','.join(row_str) + '\n')


# Fermeture du curseur et de la connexion
cur.close()
conn.close()
