## Imports

In [1]:
import pandas as pd
import requests
import os
import datetime
import gzip
import shutil
from pprint import pprint
import numpy as np
from tqdm import tqdm, tqdm_pandas
#sql alchemy
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import column_property
from sqlalchemy import Column, Integer, Float, String
from sqlalchemy import insert, select

# GESTION DES DOSSIERS

In [2]:
test=os.getcwd()
print(test)

/home/lenobian/Git/projects/ETL_bank/notebook


## Creation du dossier raw contenant les données brutes

In [24]:
#Création du dossier raw pour les donnée brutes
path="/home/lenobian/Formation/P6_ETL/data/raw"  #A modifier selon l'utilisateur
os.makedirs(path, exist_ok=True)

## Creation du dossier horodaté

In [25]:
#Récuperation de la date du jour
date=datetime.datetime.today().strftime('%Y-%m-%d')
print(type(date), date)

#Récupération de l'année au format int
year=int(datetime.datetime.today().strftime('%Y'))
print(type(year),year)

<class 'str'> 2022-07-08
<class 'int'> 2022


In [26]:
#Création du dossier horodaté
path_immo=path+"/"+date
os.makedirs(path_immo, exist_ok=True)

# DOWNLOAD STEPS

## Téléchargement du fichier compressé (gz)

In [None]:
##### Déclaration de l'url
url="https://files.data.gouv.fr/geo-dvf/latest/csv/"

while True:
    url_test = requests.get(url+str(year)+"/full.csv.gz", stream=True)
    if url_test.status_code==200:  #test reponse du serveur
        print("le fichier le plus récent est :",url+str(year)+"/full.csv.gz")
        print("Téléchargement en cours.")
        with requests.get(url+str(year)+"/full.csv.gz", stream=True) as r:
            with open(path_immo+"/full.csv.gz", "wb") as f:
                shutil.copyfileobj(r.raw, f) #Enregistrement du fichier (méthode + rapide)
        print("Téléchargement terminé.")
        break
    else:
        year-=1
    if year==1990:
        print("Erreur de connection.")
        print("Les données immobilières n'ont pas été mises à jour!")
        print("Contactez le noob de la maintenance!")
        break

## Décompression du fichier

In [None]:
#Décompression du fichier et gestion d'erreur
while True:
    try:
        print("Décompression du fichier.")
        with gzip.open(path_immo+"/full.csv.gz", 'rb') as f_in:
            with open(path_immo+"/extracted_full.csv", 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
        print("Décompression terminée.")
        break
    except:
        print("Erreur lors de la décompression.")
        print("Le fichier est peut être manquent ou corrompus.")
        print("Contactez le noob de la maintenance!")
        break

# TRANSFORM STEP

In [27]:
#Création du df
df=pd.read_csv(path_immo+"/extracted_full.csv")
df=df[['id_mutation', 'date_mutation', 'valeur_fonciere', 'adresse_numero', 'adresse_suffixe', 'adresse_nom_voie',\
        'code_postal', 'nom_commune', 'type_local']].copy()

  df=pd.read_csv(path_immo+"/extracted_full.csv")


## Général

In [28]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3379232 entries, 0 to 3379231
Data columns (total 9 columns):
 #   Column            Dtype  
---  ------            -----  
 0   id_mutation       object 
 1   date_mutation     object 
 2   valeur_fonciere   float64
 3   adresse_numero    float64
 4   adresse_suffixe   object 
 5   adresse_nom_voie  object 
 6   code_postal       float64
 7   nom_commune       object 
 8   type_local        object 
dtypes: float64(3), object(6)
memory usage: 232.0+ MB


In [29]:
#Drop dupliactes
df.drop_duplicates(subset=['id_mutation'],inplace=True)

In [30]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1282421 entries, 0 to 3379231
Data columns (total 9 columns):
 #   Column            Non-Null Count    Dtype  
---  ------            --------------    -----  
 0   id_mutation       1282421 non-null  object 
 1   date_mutation     1282421 non-null  object 
 2   valeur_fonciere   1279512 non-null  float64
 3   adresse_numero    935809 non-null   float64
 4   adresse_suffixe   65049 non-null    object 
 5   adresse_nom_voie  1269621 non-null  object 
 6   code_postal       1269701 non-null  float64
 7   nom_commune       1282421 non-null  object 
 8   type_local        861399 non-null   object 
dtypes: float64(3), object(6)
memory usage: 97.8+ MB


## Valeur_fonciere

In [31]:
#Drop des lignes valeur_foncière nulles
df.dropna(subset=['valeur_fonciere'], inplace=True)

In [32]:
df['valeur_fonciere']=df['valeur_fonciere'].astype(int)

## Adresse

In [33]:
#adresse_numero
df['adresse_numero']=df['adresse_numero'][df['adresse_numero'].notnull()].astype(int).astype(str)
#adresse_nom_voie
df['code_postal']=df['code_postal'][df['code_postal'].notnull()].astype(int).astype(str)

In [34]:
df=df.fillna('none')

In [35]:
#Format adresse_nom_voie
df['adresse_nom_voie']=df['adresse_nom_voie'].str.title()

In [36]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1279512 entries, 0 to 3379231
Data columns (total 9 columns):
 #   Column            Non-Null Count    Dtype 
---  ------            --------------    ----- 
 0   id_mutation       1279512 non-null  object
 1   date_mutation     1279512 non-null  object
 2   valeur_fonciere   1279512 non-null  int64 
 3   adresse_numero    1279512 non-null  object
 4   adresse_suffixe   1279512 non-null  object
 5   adresse_nom_voie  1279512 non-null  object
 6   code_postal       1279512 non-null  object
 7   nom_commune       1279512 non-null  object
 8   type_local        1279512 non-null  object
dtypes: int64(1), object(8)
memory usage: 97.6+ MB


# LOAD

In [37]:
#Base.py
#Create engine db with alchemy
engine=create_engine("mysql+pymysql://root:Root01Oracle!@localhost:3306/etl_db")
#DB base
Base = declarative_base()

In [38]:
#Tables.py

class ppr_values_temp(Base):
    __table_args__ = {'extend_existing': True}
    __tablename__ = 'ppr_values_temp'
    id_mutation = Column(String(55), primary_key=True)
    date_mutation = Column(String(55))
    valeur_fonciere = Column(String(15), primary_key=True)
    adresse_numero = Column(String(10))
    adresse_suffixe = Column(String(15))
    adresse_nom_voie = Column(String(55))
    code_postal = Column(String(10))
    nom_commune = Column(String(55))
    type_local = Column(String(55))
    id_property = column_property(id_mutation + ',' + valeur_fonciere)

class ppr_values_clean(Base):
    __table_args__ = {'extend_existing': True}
    __tablename__ = 'ppr_values_clean'
    id_mutation = Column(String(55), primary_key=True)
    date_mutation = Column(String(55))
    valeur_fonciere = Column(String(15), primary_key=True)
    adresse_numero = Column(String(10))
    adresse_suffixe = Column(String(15))
    adresse_nom_voie = Column(String(55))
    code_postal = Column(String(10))
    nom_commune = Column(String(55))
    type_local = Column(String(55))
    id_property = column_property(id_mutation + ',' + valeur_fonciere)


In [39]:
#Create_tables.py
for table in Base.metadata.tables:
    print(table)

if __name__=="__main__":
    Base.metadata.create_all(engine)

ppr_values_temp
ppr_values_clean


In [40]:
########################
### Peuplage Temp
########################

In [41]:
rows=df.to_dict('records')

In [42]:
session=Session(engine)

for row in rows:
    obj=ppr_values_temp(
        id_mutation=row['id_mutation'],
        date_mutation=row['date_mutation'],
        valeur_fonciere=row['valeur_fonciere'],
        adresse_numero=row['adresse_numero'],
        adresse_suffixe=row['adresse_suffixe'],
        adresse_nom_voie=row['adresse_nom_voie'],
        code_postal=row['code_postal'],
        nom_commune=row['nom_commune'],
        type_local=row['type_local']
    )
    session.add(obj)

In [43]:
session.commit()

In [44]:
session.close()

In [45]:
print("OK")

OK


In [46]:
####################
#### Inserts
####################

In [50]:
#Insert dans la seconde table (add colonne property)
session=Session(engine)

cleaned_ppr_values=session.query(ppr_values_clean.id_property)

print("******************")

changes_to_insert = session.query(ppr_values_temp.id_mutation,ppr_values_temp.date_mutation,ppr_values_temp.valeur_fonciere,ppr_values_temp.adresse_numero,ppr_values_temp.adresse_suffixe,ppr_values_temp.adresse_nom_voie,ppr_values_temp.code_postal,ppr_values_temp.nom_commune,ppr_values_temp.type_local).filter(~ppr_values_temp.id_property.in_(cleaned_ppr_values))
print('changes :')
for x in changes_to_insert[0:10]:
    print(x)
count=0
for x in changes_to_insert:
    count+=1
print("counts :", count)

    
print("***************")    
stm=insert(ppr_values_clean).from_select(['id_mutation','date_mutation','valeur_fonciere','adresse_numero','adresse_suffixe','adresse_nom_voie','code_postal','nom_commune','type_local'],changes_to_insert)

******************
changes :
('2021-1', '2021-01-05', '185000', '5080', 'none', 'Che De Vogelas', '1370', 'Val-Revermont', 'Maison')
('2021-10', '2021-01-08', '185000', '46', 'none', 'Rue Des Granges Bonnet', '1960', 'Péronnas', 'Maison')
('2021-100', '2021-01-11', '140000', '241', 'none', 'Rte De La Vieillere Haute', '1270', 'Beaupont', 'Maison')
('2021-1000', '2021-04-01', '72150', '3', 'none', 'Rue Henri Dunant', '1000', 'Bourg-en-Bresse', 'Dépendance')
('2021-10000', '2021-04-30', '137000', 'none', 'none', 'Chavagneux', '1090', 'Genouilleux', 'none')
('2021-100000', '2021-02-08', '250800', '5491', 'none', 'Rue Du Chateau D Eau', '11120', 'Saint-Marcel-sur-Aude', 'Maison')
('2021-1000000', '2021-11-03', '250150', '1021', 'none', 'Che De Travaux', '82370', 'Nohic', 'Maison')
('2021-1000001', '2021-10-22', '3000', 'none', 'none', 'Barraux', '82100', 'Castelsarrasin', 'none')
('2021-1000002', '2021-10-22', '150000', '327', 'none', 'Che De Laquete', '82410', 'Saint-Étienne-de-Tulmont', 

In [51]:
session.execute(stm)

<sqlalchemy.engine.cursor.CursorResult at 0x7f05f586e8b0>

In [52]:
session.commit()

In [53]:
session.close()

In [None]:
###################
### Test if changes
###################

#Effecuer manuellement des modifs dans la DB temp avant de tester

In [54]:
session=Session(engine)
#Insert dans la seconde table (add colonne property)

cleaned_ppr_values=session.query(ppr_values_clean.id_property)

print("******************")

changes_to_insert = session.query(ppr_values_temp.id_mutation,ppr_values_temp.date_mutation,ppr_values_temp.valeur_fonciere,ppr_values_temp.adresse_numero,ppr_values_temp.adresse_suffixe,ppr_values_temp.adresse_nom_voie,ppr_values_temp.code_postal,ppr_values_temp.nom_commune,ppr_values_temp.type_local).filter(~ppr_values_temp.id_property.in_(cleaned_ppr_values))
print('changes :')
for x in changes_to_insert:
    print(x)

    
print("***************")    
stm=insert(ppr_values_clean).from_select(['id_mutation','date_mutation','valeur_fonciere','adresse_numero','adresse_suffixe','adresse_nom_voie','code_postal','nom_commune','type_local'],changes_to_insert)

******************
changes :
('2021-10', '2021-01-08', 'LKJDQS', '46', 'none', 'Rue Des Granges Bonnet', '1960', 'Péronnas', 'Maison')
('AAAAAA', '2021-01-05', '185000', '5080', 'none', 'Che De Vogelas', '1370', 'Val-Revermont', 'Maison')
***************


In [55]:
session.execute(stm)
session.commit()
session.close()

In [None]:
###################
#### Delete
###################

In [56]:
session=Session(engine)
ppr_id=session.query(ppr_values_temp.id_property)
    
session.query(ppr_values_clean).filter(~ppr_values_clean.id_property.in_(ppr_id)).delete(synchronize_session='fetch')

2

In [57]:
session.commit()
session.close()