# Comment charger des fichiers CSV depuis un stage vers Snowflake Notebooks üìÅ

Dans cet exemple, nous allons montrer comment charger un fichier CSV depuis un stage et cr√©er une table avec Snowpark.

Commen√ßons par utiliser la commande `get_active_session` afin d‚Äôobtenir la variable de contexte de session pour travailler avec Snowpark comme suit :

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Add a query tag to the session. This helps with troubleshooting and performance monitoring.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"notebook_demo_pack", 
                     "version":{"major":1, "minor":0},
                     "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"csv_from_s3"}}
print(session)

## Cr√©er un stage externe

Nous allons ensuite cr√©er un stage externe qui r√©f√©rence des fichiers de donn√©es stock√©s en dehors de Snowflake.
Dans cet exemple, les donn√©es se trouvent dans un bucket S3.

In [None]:

CREATE DATABASE IF NOT EXISTS CITIBIKE;

USE DATABASE CITIBIKE;

USE SCHEMA PUBLIC;

CREATE STAGE IF NOT EXISTS CITIBIKE_STAGE 
	URL = 's3://logbrain-datalake/datasets/citibike-trips-csv/';

## Examiner les fichiers pr√©sents dans le stage

Regardons les fichiers disponibles dans le stage.

In [None]:
LS @CITIBIKE_STAGE;

## Charger le fichier CSV avec Snowpark

Nous pouvons utiliser Snowpark DataFrameReader pour lire le fichier CSV.

En utilisant l‚Äôoption `infer_schema = True`, Snowflake d√©duira automatiquement le sch√©ma √† partir des types de donn√©es pr√©sents dans le fichier CSV, ce qui √©vite de devoir le d√©finir √† l‚Äôavance.

In [None]:
# Create a DataFrame that is configured to load data from the CSV file.
df = session.read.options({"infer_schema":True}).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

## Travailler avec le DataFrame Snowpark

Maintenant que les donn√©es sont charg√©es dans un DataFrame Snowpark, nous pouvons les manipuler √† l‚Äôaide de l‚ÄôAPI Snowpark DataFrame.

Par exemple, nous pouvons calculer des statistiques descriptives sur les colonnes.

In [None]:
df.describe()

In [None]:
df.dtypes

### Creation d'un Schema de donn√©es

In [None]:
from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType

trips_schema = StructType([
    StructField("tripduration", IntegerType()),
    StructField("starttime", TimestampType()),
    StructField("stoptime", TimestampType()),
    StructField("start_station_id", IntegerType()),
    StructField("start_station_name", StringType()), 
    StructField("start_station_latitude", FloatType()),
    StructField("start_station_longitude", FloatType()),
    StructField("end_station_id", IntegerType()), 
    StructField("end_station_name", StringType()),
    StructField("end_station_latitude", FloatType()),
    StructField("end_station_longitude", FloatType()),  
    StructField("bikeid", IntegerType()),
    StructField("membership_type", StringType()), 
    StructField("usertype", StringType()), 
    StructField("birth_year", IntegerType()), 
    StructField("gender", IntegerType())
    ])

## Utilisation d'un schema
Pour pouvoir avoir les noms des colonnes dans le dataframe on va utiliser trips_schema lors du charegement de donn√©es.
On va aussi d√©finir des options de formats pour pouvoir lire le fichier correctement.

In [None]:
# re Create a DataFrame that is configured to load data from the CSV file with schema.
df = session.read.options({"infer_schema":True,"field_delimiter": ",", "skip_header": 1, "field_optionally_enclosed_by" : '\042' , "null_if" : ('')}).schema(trips_schema).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

### Cr√©er un file fromat:

On peut aussi cr√©er le file formation s√©parement et l'utiliser lors du chargement des donn√©es.

In [None]:
format = session.sql("create or replace file format csv_format type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1  field_optionally_enclosed_by = '\042' null_if = (''); ").collect()

df = session.read.option("format_name", "csv_format").schema(trips_schema).csv('@CITIBIKE_STAGE/trips_2018_0_0_0.csv.gz')

In [None]:
df

## √âcrire le DataFrame dans une table Snowflake

Nous pouvons enregistrer le DataFrame dans une table appel√©e `APP_ORDER` puis l‚Äôinterroger en SQL.

In [None]:
df.write.mode("overwrite").save_as_table("TRIPS")

In [None]:
-- Preview the newly created TRIPS table
SELECT * from TRIPS;

## Relire la table dans Snowpark

Nous pouvons relire la table dans Snowpark en utilisant la syntaxe `session.table`.

In [None]:
df = session.table("TRIPS")
df

## Continuer le traitement des donn√©es

√Ä partir de l√†, vous pouvez continuer √† interroger et traiter les donn√©es.

In [None]:
df.groupBy('"START_STATION_NAME"').count()

### Continuer le traitement des donn√©es
√Ä partir d‚Äôici, vous pouvez continuer √† interroger et traiter les donn√©es.

In [None]:

USE DATABASE CITIBIKE;

CREATE STAGE IF NOT EXISTS WEATHER_STAGE 
	URL = 's3://logbrain-datalake/datasets/weather-nyc-json/';

### V√©rifier les fichiers pr√©sents dans le stage

In [None]:
LS @WEATHER_STAGE;

### Charger le fichier CSV avec Snowpark
Nous pouvons utiliser Snowpark DataFrameReader pour lire le fichier JSON.

En utilisant l‚Äôoption infer_schema = True, Snowflake d√©duira automatiquement le sch√©ma √† partir des types de donn√©es pr√©sents dans le fichier JSON, ce qui √©vite de devoir le d√©finir manuellement.

In [None]:
# Create a DataFrame that is configured to load data from the json file.
df = session.read.option("compression", "gzip").json('@WEATHER_STAGE/hourlyData-2018-1.json.gz')

In [None]:
df

In [None]:
df.describe()

In [None]:
df.dtypes

## √âcrire le DataFrame dans une table Snowflake

Nous pouvons enregistrer le DataFrame dans une table appel√©e `weather_json` puis l‚Äôinterroger en SQL.

In [None]:
df.write.mode("overwrite").save_as_table("WEATHER_JSON")

### √âcrire le DataFrame dans une table Snowflake

In [None]:
-- Preview the newly created weather table
SELECT * from WEATHER_JSON;

### Relire la table dans Snowpark

In [None]:
df = session.table("WEATHER_JSON")
df

## Parser la table JSON 

In [None]:
USE DATABASE CITIBIKE;

CREATE TABLE IF NOT EXISTS weather as
select 
   $1:"coco"::STRING as "coco" ,
   $1:"country"::STRING as "country",
   $1:"dwpt"::FLOAT as "dwpt",
   $1:"ele$1ation"::STRING as "ele$1ation",
   $1:"icao"::STRING as "icao",
   $1:"latitude"::DECIMAL as "latitude",
   $1:"longitude"::DECIMAL as "longitude",
   $1:"name"::STRING as "name",
   $1:"obsTime"::TIMESTAMP as "obsTime",
   $1:"prcp"::STRING as "prcp" ,
   $1:"pres"::DECIMAL as "pres",
   $1:"region"::STRING as "region",
   $1:"rhum"::STRING as "rhum",
   $1:"snow"::STRING as "snow",
   $1:"station"::STRING as "station",
   $1:"temp"::DECIMAL "temp",
   $1:"timezone"::STRING as "timezone",
   $1:"tsun"::STRING as "tsun",
   $1:"wdir"::STRING as "wdir",
   $1:"weatherCondition"::STRING as "weatherCondition",
   $1:"wmo"::STRING as "wmo",
   $1:"wpgt"::STRING as "wpgt",
   $1:"wspd"::DECIMAL as "wspd"
 from WEATHER_JSON;


In [None]:
-- Preview the newly created weather table
SELECT * from WEATHER;

### Relire la table dans Snowpark

In [None]:
df = session.table("WEATHER")
df

### Cr√©er un file format JSON

In [None]:
format = session.sql("create or replace file format json_format type = 'JSON' strip_outer_array = true;").collect()

# Create a DataFrame that is configured to load data from the json file.
df = session.read.option("format_name", "json_format").json('@WEATHER_STAGE/hourlyData-2018-1.json.gz')

df

In [None]:
df.write.mode("overwrite").save_as_table("WEATHER_JSON")

In [None]:
select * from WEATHER_JSON;

In [None]:
USE DATABASE CITIBIKE;

CREATE OR REPLACE TABLE  weather as
select 
   $1:"coco"::STRING as "coco" ,
   $1:"country"::STRING as "country",
   $1:"dwpt"::FLOAT as "dwpt",
   $1:"ele$1ation"::STRING as "ele$1ation",
   $1:"icao"::STRING as "icao",
   $1:"latitude"::DECIMAL as "latitude",
   $1:"longitude"::DECIMAL as "longitude",
   $1:"name"::STRING as "name",
   $1:"obsTime"::TIMESTAMP as "obsTime",
   $1:"prcp"::STRING as "prcp" ,
   $1:"pres"::DECIMAL as "pres",
   $1:"region"::STRING as "region",
   $1:"rhum"::STRING as "rhum",
   $1:"snow"::STRING as "snow",
   $1:"station"::STRING as "station",
   $1:"temp"::DECIMAL "temp",
   $1:"timezone"::STRING as "timezone",
   $1:"tsun"::STRING as "tsun",
   $1:"wdir"::STRING as "wdir",
   $1:"weatherCondition"::STRING as "weatherCondition",
   $1:"wmo"::STRING as "wmo",
   $1:"wpgt"::STRING as "wpgt",
   $1:"wspd"::DECIMAL as "wspd"
 from WEATHER_JSON;

In [None]:
select * from weather;

### Nettoyage des ressources

In [None]:
--  Database, table and stage created as part of this example
DROP TABLE IF EXISTS TRIPS;
DROP TABLE IF EXISTS WEATHER;
DROP TABLE IF EXISTS WEATHER_JSON;
DROP STAGE IF EXISTS CITIBIKE_STAGE;
DROP DATABASE IF EXISTS CITIKIE;

## Conclusion
Dans cet exemple, nous avons vu comment charger des fichiers CSV et JSON depuis un stage externe afin de traiter et interroger les donn√©es dans un notebook √† l‚Äôaide de Snowpark.