# INTRODUCTION

spark-submit --executor-memory 4G my_script.py

In [1]:

from pyspark.sql import SparkSession
import requests
import json

# Initialize a Spark session
spark = SparkSession.builder.appName("FichesPratiquesDT") \
            .config("spark.executor.memory", "5g") \
            .config("spark.driver.memory", "5g") \
            .getOrCreate()

# Get the value of spark.executor.memory
executor_memory = spark.conf.get("spark.executor.memory")
print(f"Executor Memory: {executor_memory}")

# Get the value of spark.driver.memory
driver_memory = spark.conf.get("spark.driver.memory")
print(f"Driver Memory: {driver_memory}")

# Get the SparkConf object from the Spark session
conf = spark.sparkContext.getConf()

# Print all Spark configuration settings
# for item in conf.getAll():
#     print(item)

Executor Memory: 5g
Driver Memory: 5g


CHARGER LE JSON A PARTIR D'UN LIEN

In [2]:

# Step 1: Fetch JSON data from the URL
# response = requests.get("http://api.luftdaten.info/static/v1/data.json" )


# URL of the JSON file
url = "https://raw.githubusercontent.com/SocialGouv/fiches-travail-data/master/data/fiches-travail.json"
# Fetch JSON data from the URL
response = requests.get(url)
json_data = response.json()  # Parse the JSON data

display(json_data[:1])


[{'date': '02/08/2024',
  'description': "Chaque syndicat représentatif dans une entreprise ou un établissement d'au moins 50 salariés peut désigner un délégué syndical (DS). Le délégué (…)",
  'intro': '<p>Chaque syndicat représentatif dans une entreprise ou un établissement d’au moins 50 salariés peut désigner un délégué syndical (DS). Le délégué syndical exerce un rôle de représentation du syndicat auquel il appartient et de négociateur de conventions ou d’accords collectifs d’entreprise ou d’établissement. Il bénéficie d’une protection particulière en matière de licenciement.</p>',
  'pubId': 'article375542',
  'sections': [{'anchor': '',
    'html': '<div class="texteencadre-spip spip"><strong>À savoir&nbsp;!</strong><br class="autobr">Le mandat de délégué syndical peut être cumulé avec celui de membre de la délégation du personnel au CSE ou de représentant syndical à ce comité.</div>',
    'text': ' À savoir ! Le mandat de délégué syndical peut être cumulé avec celui de membre de

CHARGER LE JSON A PARTIR DU REPERTOIRE

In [3]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType, MapType
from pyspark.sql.functions import explode, col, to_date

# Define the schema
schema = StructType([
    StructField("date", StringType(), True),
    StructField("description", StringType(), True),
    StructField("intro", StringType(), True),
    StructField("pubId", StringType(), True),
    StructField("sections", ArrayType(
        StructType([
            StructField("anchor", StringType(), True),
            StructField("html", StringType(), True),
            StructField("text", StringType(), True),
            StructField("title", StringType(), True),
            StructField("description", StringType(), True),
            StructField("references", MapType(StringType(), StringType()), True)
        ])
    ), True)
])


# Convert the list of dictionaries to JSONLines format
# data = "\n".join([json.dumps(record) for record in data])

# Step 2: Convert JSON data to an RDD
rdd = spark.sparkContext.parallelize(json_data[:1000])
data = None

# Print the content of the RDD
# for record in rdd.collect():
#     print(record)


# Step 3: Convert RDD to DataFrame

# Convert your processed RDD to a DataFrame using your schema
# df = spark.createDataFrame(rdd)
df = spark.createDataFrame(rdd, schema=schema)

# Convert the date string to a DateType
# df = df.withColumn("date", to_date(col("date"), "dd/MM/yyyy"))

# Explode the sections array to create multiple rows
df_flattened = df.withColumn("section", explode(col("sections")))

# Simply select the relevant columns
# Select all fields including the flattened section fields
# Select the columns with correct referencing
df_final = df_flattened.select(
    "date",
    "description",
    "intro",
    "pubId",
    col("section.title").alias("section_title"),
    col("section.html").alias("section_html"),
    col("section.text").alias("section_text"),
    col("section.anchor").alias("section_anchor"),
    col("section.description").alias("section_description")
)


# Filter out rows with `_corrupt_record` and select only valid rows
# df = df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")

# Print the schema of the DataFrame
# df.printSchema()
df_final.printSchema()
# 
# Show the first 2 rows of the DataFrame
# df.show(10, truncate=True)
df_final.show(10, truncate=True)

# print df head
# display(df.head(2))



root
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- intro: string (nullable = true)
 |-- pubId: string (nullable = true)
 |-- section_title: string (nullable = true)
 |-- section_html: string (nullable = true)
 |-- section_text: string (nullable = true)
 |-- section_anchor: string (nullable = true)
 |-- section_description: string (nullable = true)

+----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      date|         description|               intro|        pubId|       section_title|        section_html|        section_text|      section_anchor| section_description|
+----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|02/08/2024|Chaque syndicat r...|<p>Chaque syndica...|article375542|Les délégués synd...|

# Add a markdown column

Install package

In [4]:
!pip install -q markdownify

Run then conversion

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from markdownify import markdownify as md

# Define the UDF to convert HTML to Markdown using markdownify
def html_to_markdown(html_content):
    return md(html_content, heading_style="ATX")  # Using ATX style for headings (e.g., # H1)

# # Define the UDF to convert HTML to Markdown
# import html2text
# def html_to_text(html_content):
#     h = html2text.HTML2Text()
#     h.ignore_links = False  # Set this to True if you want to ignore links
#     return h.handle(html_content)

# Register the UDF with PySpark
html_to_markdown_udf = udf(html_to_markdown, StringType())

# Apply the UDF to create the new section_markdown column
df_markdown = df_final.withColumn(
    "section_markdown",
    html_to_markdown_udf(col("section_html"))
)

# Show the first 2 rows of the DataFrame
df_markdown.show(10, truncate=True)

+----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      date|         description|               intro|        pubId|       section_title|        section_html|        section_text|      section_anchor| section_description|    section_markdown|
+----------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|02/08/2024|Chaque syndicat r...|<p>Chaque syndica...|article375542|Les délégués synd...|<div class="texte...| À savoir ! Le ma...|                    |À savoir ! Le man...|**À savoir !**  \...|
|02/08/2024|Chaque syndicat r...|<p>Chaque syndica...|article375542|L’action des synd...|<div class="spip_...|L'action des synd...|L-action-des-synd...|L'action des synd...|\n\n[L'action des...|
|02/08/2024|Chaque syndic

# Installation de PyTorch

In [6]:
!pip install transformers>=4.36.0 torch

In [7]:
### CUDA capable ?

In [8]:
import torch
print(torch.__version__)

# Check if CUDA is available
cuda_available = torch.cuda.is_available()
print(f"CUDA available: {cuda_available}")

# Number of available GPUs
num_gpus = torch.cuda.device_count()
print(f"Number of GPUs available: {num_gpus}")

# List all available devices
for i in range(num_gpus):
    print(f"GPU {i}: {torch.cuda.get_device_name(i)}")

2.4.0+cu121
CUDA available: True
Number of GPUs available: 1
GPU 0: NVIDIA GeForce GTX 1060


# Creation du text et Resume 

In [10]:
from transformers import pipeline
from pyspark.sql.functions import concat, col, lit, udf
from pyspark.sql.types import StringType

# Initialize the summarization pipeline from transformers
# Note: Ensure that you have GPU support if you're setting device=0.
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device="cuda")

# Define the UDF to summarize text
def summarize_text(text):
    summary = summarizer(
        text, 
        max_length=50, 
        min_length=25, 
        do_sample=False, 
        clean_up_tokenization_spaces=True  # Explicitly set this to avoid the warning
    )
    return summary[0]['summary_text']

# Register the UDF with PySpark
summarize_udf = udf(summarize_text, StringType())

# Concatenate section_title and section_text
df_final = df_final.withColumn(
    "concatenated_text",
    concat(col("section_title"), lit("\n\n"), col("section_text"))
)

# Concatenate section_title and section_text
df_final = df_final.withColumn(
    "instructed_text",
    concat(lit("search_document: "), col("concatenated_text"))
)
# Apply the UDF to create the new summary column
# summarized_df = df_final.withColumn(
#     "summary",
#     summarize_udf(col("concatenated_text"))
# )

# Select the relevant columns and display the result
# result_df = summarized_df.select(
result_df = df_final.select(
    col("section_title"),
    col("section_text"),
    col("section_html"),
    "concatenated_text",
    "instructed_text",
    # "summary"
)

# Show the resulting DataFrame
result_df.show(10, truncate=True)


+--------------------+--------------------+--------------------+--------------------+--------------------+
|       section_title|        section_text|        section_html|   concatenated_text|     instructed_text|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Les délégués synd...| À savoir ! Le ma...|<div class="texte...|Les délégués synd...|search_document: ...|
|L’action des synd...|L'action des synd...|<div class="spip_...|L’action des synd...|search_document: ...|
|Quelles sont les ...|Chaque organisati...|<p>Chaque organis...|Quelles sont les ...|search_document: ...|
|Quel est le nombr...|Cas généralLe nom...|<h4 class="spip">...|Quel est le nombr...|search_document: ...|
|Quelle est la dur...|Le mandat de délé...|<p>Le mandat de d...|Quelle est la dur...|search_document: ...|
|Quelles sont les ...|Le délégué syndic...|<p>Le délégué syn...|Quelles sont les ...|search_document: ...|
|Peut-il y avoir c...|Oui, les foncti

# Create de embedding column

[Massive Text Embedding Benchmark (MTEB) Leaderboard.](https://huggingface.co/spaces/mteb/leaderboard)

In [15]:
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Load pre-trained model and tokenizer
# sentence-transformers/all-MiniLM-L6-v2
# model_name_or_path = 'Alibaba-NLP/gte-multilingual-base'
model_name_or_path = 'sentence-transformers/all-MiniLM-L6-v2'
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
model = AutoModel.from_pretrained(model_name_or_path, trust_remote_code=True)

# Define the UDF to embed the Markdown content
def embed_markdown(markdown_content):
    
    inputs = tokenizer(markdown_content, padding=True, truncation=True, return_tensors='pt')
    # inputs = tokenizer(markdown_content, max_length=8192, padding=True, truncation=True, return_tensors='pt')

    with torch.no_grad():
        outputs = model(**inputs)
        embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().tolist()

        # dimension=768 # The output dimension of the output embedding, should be in [128, 768]
        # embeddings = outputs.last_hidden_state[:, 0][:dimension]
        # embeddings = F.normalize(embeddings, p=2, dim=1)
    
    return embeddings

# Register the UDF with PySpark
embed_markdown_udf = udf(embed_markdown, ArrayType(FloatType()))

# Assuming 'markdown_df' is your DataFrame with the 'section_markdown' column
# Apply the UDF to create the new markdown_embedding column
embedded_df = df_final.withColumn(
    "instructed_text_embedding",
    embed_markdown_udf(col("instructed_text"))
)

# Select the relevant columns and display the result
result_df = embedded_df.select(
    "date",
    "section_title",
    "section_text",
    "instructed_text_embedding",
    "instructed_text",
    "concatenated_text",
)

# Show the resulting DataFrame
result_df.show(10, truncate=True)


+----------+--------------------+--------------------+-------------------------+--------------------+--------------------+
|      date|       section_title|        section_text|instructed_text_embedding|     instructed_text|   concatenated_text|
+----------+--------------------+--------------------+-------------------------+--------------------+--------------------+
|02/08/2024|Les délégués synd...| À savoir ! Le ma...|     [-0.08907351, 0.0...|search_document: ...|Les délégués synd...|
|02/08/2024|L’action des synd...|L'action des synd...|     [-0.0386786, -0.0...|search_document: ...|L’action des synd...|
|02/08/2024|Quelles sont les ...|Chaque organisati...|     [-0.08338758, 0.1...|search_document: ...|Quelles sont les ...|
|02/08/2024|Quel est le nombr...|Cas généralLe nom...|     [-0.056518886, -0...|search_document: ...|Quel est le nombr...|
|02/08/2024|Quelle est la dur...|Le mandat de délé...|     [-0.049893327, 0....|search_document: ...|Quelle est la dur...|
|02/08/2024|Quel

# Write the DataFrame

In [None]:
# Write the DataFrame to a CSV file
df.write.mode("overwrite").option("header", "true").csv("output.csv")

# Save the DataFrame to a JSON file without truncation
df.write.mode("overwrite").json("output.json")

# If you need a single JSON file:
df.coalesce(1).write.mode("overwrite").json("single_output.json")

# Save the DataFrame as a JSONL file
df.write.mode("overwrite").json("output.jsonl", lineSep="\n")