In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
#from pyspark.sql import functions as F
#from pyspark.sql import types as T
from pyspark.sql.functions import col
#from deep_translator import GoogleTranslator

In [2]:
credentials_location = './gcs.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .set("spark.driver.extraClassPath", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.executor.extraClassPath", "./lib/gcs-connector-hadoop3-2.2.5.jar")

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/03/18 18:07:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
spark.stop()

In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
df_courses = spark.read.parquet('gs://jugnu-france-course-enrollments/courses_data/courses_raw_parquet/1742298455.7402864.23a1a4c6d1.parquet')

                                                                                

In [None]:
df_courses.show(5)

In [6]:
# Assuming your DataFrame is named df

filter_values = [
    "Informatique, traitement de l'information, réseaux de transmission",
    "Enseignement, formation",
    "Commerce, vente",
    "Comptabilite, gestion",
    "Spécialités pluri-scientifiques",
    "Spécialites plurivalentes de la communication et de l'information"
]

In [7]:
df_courses = df_courses.filter(col("libelle_nsf_1").isin(filter_values))

In [8]:
df_courses.count()

                                                                                

69210

In [9]:
# Define the columns to rename and their new names
columns_to_rename = {
    'nom_of': 'provider',
    'siret': 'provider_ID',
    'nom_region': 'region',
    'nom_departement': 'department',
    'intitule_certification': 'certification_title',
    'libelle_niveau_sortie_formation': 'training_exit_level',
    'libelle_code_formacode_principal': 'main_formacode_desc',
    'libelle_nsf_1': 'nsf_code_1_desc',
    'libelle_nsf_2': 'nsf_code_2_desc',
    'libelle_nsf_3': 'nsf_code_3_desc',
    'numero_formation': 'training_ID',
    'intitule_formation': 'title',
    'points_forts': 'strengths',
    'nb_session_active': 'nb_active_session',
    'nb_session_a_distance': 'nb_distant_session',
    'nombre_heures_total_min': 'duration_min',
    'nombre_heures_total_max': 'duration_max',
    'nombre_heures_total_mean': 'duration_mean',
    'frais_ttc_tot_min': 'cost_min',
    'frais_ttc_tot_max': 'cost_max',
    'frais_ttc_tot_mean': 'cost_mean'
}

In [10]:
# Rename the columns
for old_name, new_name in columns_to_rename.items():
    if old_name in df_courses.columns:
        df_courses = df_courses.withColumnRenamed(old_name, new_name)
    else:
        print(f"Column '{old_name}' not found, skipping rename.")

In [11]:
df_courses.columns

['date_extract',
 'provider',
 'department',
 'region',
 'type_referentiel',
 'code_inventaire',
 'code_rncp',
 'certification_title',
 'training_exit_level',
 'code_formacode_1',
 'code_formacode_2',
 'code_formacode_3',
 'code_formacode_4',
 'code_formacode_5',
 'main_formacode_desc',
 'code_rome_1',
 'code_rome_2',
 'code_rome_3',
 'code_rome_4',
 'code_rome_5',
 'nsf_code_1_desc',
 'nsf_code_2_desc',
 'nsf_code_3_desc',
 'code_nsf_1',
 'code_nsf_2',
 'code_nsf_3',
 'code_certifinfo',
 'provider_ID',
 'training_ID',
 'title',
 'strengths',
 'objectif_formation',
 'contenu_formation',
 'resultats_attendus_formation',
 'nb_action',
 'nb_active_session',
 'nb_distant_session',
 'duration_min',
 'duration_max',
 'duration_mean',
 'cost_min',
 'cost_max',
 'cost_mean',
 'code_departement',
 'code_region',
 'nbaction_nbheures',
 'coderegion_export']

In [12]:
df_courses.coalesce(1).write.parquet('gs://jugnu-france-course-enrollments/courses_data/courses_raw_parquet/france_courses_en.parquet', mode='overwrite')

25/03/18 18:08:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# earlier

In [None]:
def translate_batch(unique_values):
    translations = {}
    translator = GoogleTranslator(source='auto', target='en')
    for value in unique_values:
        if value is not None:
            try:
                translations[value] = translator.translate(value)
            except Exception as e:
                print(f"Translation error: {e}")
                translations[value] = None
        else:
            translations[value] = None
    return translations

In [None]:
columns_to_translate = ['certification_title'] #, 'title', 'main_formacode_desc']
translation_maps = {}

In [None]:
print(f"Number of partitions: {df_courses.rdd.getNumPartitions()}")

In [None]:
df_courses = df_courses.repartition(24)

In [None]:
for col_name in columns_to_translate:
    unique_values = [row[0] for row in df_courses.select(col_name).distinct().collect()]
    translation_maps[col_name] = translate_batch(unique_values)

In [None]:
for col_name in columns_to_translate:
    translation_map = translation_maps[col_name]
    broadcasted_map = sc.broadcast(translation_map)

    @F.udf(returnType=T.StringType())
    def lookup_translation(input_value):
        return broadcasted_map.value.get(input_value)

    df_courses = df_courses.withColumn(f'{col_name}_en', lookup_translation(F.col(col_name)))

In [None]:
@F.udf(returnType=T.StringType())
def translate(input):
    if input is None:
        return None  # Or return "" if you prefer an empty string
    try:
        return GoogleTranslator(source='auto', target='en').translate(input)
    except NotValidPayload:
        return None #Or some other error handling.
    except Exception as e:
        print(f"Translation error: {e}")
        return None

In [None]:
df_courses = df_courses\
            .withColumn('certification_title_en', translate(F.col('certification_title')))\
            .withColumn('title_en', translate(F.col('title')))\
            .withColumn('main_formacode_desc_en', translate(F.col('main_formacode_desc')))

In [None]:
df_courses.show(5)

In [None]:
df_courses\
    .write\
    .parquet('gs://jugnu-france-course-enrollments/courses_data/courses_raw_parquet/france_courses_en.parquet', mode='overwrite')

In [None]:
import pandas as pd
import gcsfs
import os

def read_parquet_from_gcs(gcs_path, credentials_path):
    """Reads a Parquet file from Google Cloud Storage into a pandas DataFrame."""
    try:
        # Set the environment variable within the function
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs.json"

        fs = gcsfs.GCSFileSystem()
        with fs.open(gcs_path) as f:
            df = pd.read_parquet(f)
        return df
    except Exception as e:
        print(f"Error reading Parquet file: {e}")
        return None

# Example usage:
gcs_path = "gs://your-bucket/path/to/your/file.parquet"  # Replace with your GCS path
credentials_path = "gcs.json"  # Replace with your path

df = read_parquet_from_gcs(gcs_path, credentials_path)

if df is not None:
    print(df.head())