# Weather forecast - dataset generator

### In this notebook, climate data provided by the DWD will be transferred to a local sqlite database. This climate information is provided free of charge (due to legal obligations) by the DWD at the following address: https://opendata.dwd.de/climate_environment/CDC/

---

## Import of the required libraries

In [1]:
import fnmatch
import os
import pathlib
import re
import sqlite3
from datetime import datetime
from ftplib import FTP
from zipfile import ZipFile

## General settings/parameters

In [2]:
# configure the data that will be received
CreateDatabase = True
IncludeHistoricalData = True
IncludeRecentData = True

# select different stations (feel free to add/remove stations)
stations = ['02483', '02437', '02315', '02261', '02171', '01975']

# select start year
# Note: Make sure the startYear is >= 1970 because the timestamp will be converted into unix timestamp
recordStartYear = 2010

# general settings
path = 'opendata.dwd.de'
databaseScriptName = 'database.sql'
databaseName = 'weather.db'
workDirectory = os.path.join(pathlib.Path().resolve(), 'raw')

## Create an empty sql database using a sql script

### Delete old SQLite database

In [3]:
if (CreateDatabase):
    dbFilePath = os.path.join(pathlib.Path().resolve(), databaseName)
    if (os.path.exists(dbFilePath)):
        os.remove(dbFilePath)

### Load SQL Script to create a new SQLite database

In [4]:
if (CreateDatabase):
    with open(databaseScriptName, 'r') as sqlFile:
        sqlScript = sqlFile.read()

### Define functions to insert master data into the SQLite database

In [5]:
# define a function to insert the stations master data
def StationsMasterData(filePath, dbCursor):
    print('Insert stations master data into local database')
    if (os.path.exists(filePath)):
        with open(filePath, 'r') as stationFile:
            cmd = 'INSERT OR IGNORE INTO Stations(StationIdent, Description, State, SeaLevel, Latitude, Longitude) VALUES (?, ?, ?, ?, ?, ?)'
            lines = stationFile.readlines()
            for line in lines:
                line = line.strip() # remove leading and trailing whitespace
                line = re.sub(' +', ' ', line) # remove all inline whitespace except one
                lineSplit = line.split(';')
                if (len(lineSplit) == 8):
                    stationIdent = int(lineSplit[0].strip())
                    seaLevel = float(lineSplit[3].strip())
                    latitude = float(lineSplit[4].strip())
                    longitude = float(lineSplit[5].strip())
                    description = lineSplit[6].strip()
                    state = lineSplit[7].strip()

                    dbCursor.execute(cmd, (stationIdent, description, state, seaLevel, latitude, longitude))          

In [6]:
def WeatherPhenomenaMasterData(filePath, dbCursor):
    print('Insert weather phenomena master data into local database')
    if (os.path.exists(filePath)):
        with open(filePath, 'r') as stationFile:
            cmd = 'INSERT OR IGNORE INTO WeatherPhenomena(WeatherPhenomenonIdent, Description) VALUES (?, ?)'
            lines = stationFile.readlines()
            for line in lines:
                line = line.strip() # remove leading and trailing whitespace
                line = re.sub(' +', ' ', line) # remove all inline whitespace except one
                lineSplit = line.split(';')
                if (len(lineSplit) == 4):
                    phenomenonIdent = int(lineSplit[0].strip())
                    description = lineSplit[1].strip()

                    dbCursor.execute(cmd, (phenomenonIdent, description))  

In [7]:
def QualityLevelMasterData(filePath, dbCursor):
    print('Insert quality level master data into local database')
    if (os.path.exists(filePath)):
        with open(filePath, 'r') as qualityLevelFile:
            cmd = 'INSERT OR IGNORE INTO QualityLevels(QLIdent, Description) VALUES (?, ?)'
            lines = qualityLevelFile.readlines()
            for line in lines:
                line = line.strip() # remove leading and trailing whitespace
                line = re.sub(' +', ' ', line) # remove all inline whitespace except one
                lineSplit = line.split(';')
                if (len(lineSplit) == 2):
                    qlIdent = int(lineSplit[0].strip())
                    description = lineSplit[1].strip()

                    dbCursor.execute(cmd, (qlIdent, description)) 

In [8]:
def QualityByteMasterData(filePath, dbCursor):
    print('Insert quality byte master data into local database')
    if (os.path.exists(filePath)):
        with open(filePath, 'r') as qualityLevelFile:
            cmd = 'INSERT OR IGNORE INTO QualityBytes(QBIdent, Description) VALUES (?, ?)'
            lines = qualityLevelFile.readlines()
            for line in lines:
                line = line.strip() # remove leading and trailing whitespace
                line = re.sub(' +', ' ', line) # remove all inline whitespace except one
                lineSplit = line.split(';')
                if (len(lineSplit) == 2):
                    qbIdent = int(lineSplit[0].strip())
                    description = lineSplit[1].strip()

                    dbCursor.execute(cmd, (qbIdent, description)) 

### Create a new SQLite database

In [9]:
# execute the sql script
if (CreateDatabase):
    db = sqlite3.connect(databaseName)
    cursor = db.cursor()
    cursor.executescript(sqlScript)
    db.commit()

    # insert master data
    workDirectory = os.path.join(pathlib.Path().resolve(), 'raw')
    StationsMasterData(os.path.join(workDirectory, 'stations.txt'), cursor)
    WeatherPhenomenaMasterData(os.path.join(workDirectory, 'weather_phenomena.txt'), cursor)
    QualityLevelMasterData(os.path.join(workDirectory, 'quality_levels.txt'), cursor)
    QualityByteMasterData(os.path.join(workDirectory, 'quality_bytes.txt'), cursor)
    db.commit()

    db.close()

Insert stations master data into local database
Insert weather phenomena master data into local database
Insert quality level master data into local database
Insert quality byte master data into local database


## Auxiliary functions

In [10]:
def CreateDateTime(timestamp):
    # dwd timestring consists of 1950040105 = 1950.04.01 05:00:00
    year = int(timestamp[0:4])
    month = int(timestamp[4:6])
    day = int(timestamp[6:8])
    hour = int(timestamp[8:10])
    return datetime(year, month, day, hour, 0, 0)

In [11]:
def FilterFileList(filters, files):
    filteredFiles = []
    for file in files:
        for filter in filters:
            if filter in file:
                if file.endswith('.zip'):
                    filteredFiles.append(file)
    return filteredFiles

In [12]:
# dummy definition
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # dummy parameter
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            eor = line[2] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, timestamp, unixTimestamp)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

In [13]:
def ProcessFTPData(files, fileFilter, offsetYear, sqlCommand, workDirectory, database, lineProcessor, utf8=True):
    # count files to report current process
    fileIndex = 1
    fileCount = len(files)
    
    # db cursor
    cursor = database.cursor()

    # process each filtered file (only selected stations)
    for file in files:
        print('Process file', fileIndex, 'of', fileCount, '-', file)

        try:
            # retrieve file
            zipFilePath = os.path.join(workDirectory, file)
            ftp.retrbinary("RETR " + file, open(zipFilePath, 'wb').write)

            # unzip file
            with ZipFile(zipFilePath) as zf:
                hasHeader = True

                # find the data file with certain name
                matchingFiles = fnmatch.filter(zf.namelist(), fileFilter)
                # only proceed when file name is found
                if len(matchingFiles) >= 1:    
                    # only the first file will be opened and processed                
                    fileName = matchingFiles[0]
                    with zf.open(fileName, 'r') as datafile:
                        # read lines
                        lines = datafile.readlines()

                        # process lines in file
                        lineCount = 0
                        for line in lines:
                            if (lineCount > 0 or not hasHeader):
                                if utf8:                      
                                    line = line.decode('utf-8').rstrip()
                                else:
                                    line = line.decode('ISO-8859-1').rstrip()
                                line = [x.strip() for x in line.split(';')]

                                result = lineProcessor(line, offsetYear)
                                if (result is not None):
                                    cursor.execute(sqlCommand, result)
                                    
                            # count lines processed      
                            lineCount = lineCount + 1   

            # clean up and remove each retrieved file after usage
            os.remove(zipFilePath)

            # count processed files
            fileIndex = fileIndex + 1
        except Exception as e:
            print('An error occured while processing file', file)
            print('Error:', e)   

    # close db cursor
    cursor.close()

---

## Connect to DWD FTP Server and local SQLite database

In [14]:
# connect to local database
db = sqlite3.connect(databaseName)

# connect and login to ftp server
ftp = FTP(path)  # connect to host, default port
ftp.login()
ftp.getwelcome()

"220 Welcome to DWD's opendata FTP service."

---

# Air Temperature

### Station observations of 2m air temperature and humidity for Germany

## General settings/parameters

In [15]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/air_temperature/recent'
sqlCommand = 'INSERT OR IGNORE INTO AirTemperature(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, TT_TU, RF_TU) VALUES (?, ?, ?, ?, ?, ?)'
dataFileFilter = 'produkt_tu_stunde*.txt'

In [16]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_9 = int(line[2]) # quality level of next columns
            tt_tu = float(line[3]) # 2m air temperature
            rf_tu = float(line[4]) # 2m relative humidity 
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_9, timestamp, unixTimestamp, tt_tu, rf_tu)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [17]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()   

Process file 1 of 6 - stundenwerte_TU_01975_19490101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_TU_02171_19510101_20211231_hist.zip
Process file 3 of 6 - stundenwerte_TU_02261_19480101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_TU_02315_20021101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_TU_02437_20020101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_TU_02483_19510101_20211231_hist.zip


## Recent data

In [18]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()    

Process file 1 of 6 - stundenwerte_TU_01975_akt.zip
Process file 2 of 6 - stundenwerte_TU_02171_akt.zip
Process file 3 of 6 - stundenwerte_TU_02261_akt.zip
Process file 4 of 6 - stundenwerte_TU_02315_akt.zip
Process file 5 of 6 - stundenwerte_TU_02437_akt.zip
Process file 6 of 6 - stundenwerte_TU_02483_akt.zip


---

# Cloudiness

### Station observations of cloudiness for German

## General settings/parameters

In [19]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/cloudiness/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/cloudiness/recent'
dataFileFilter = 'produkt_n_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Cloudiness(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, V_N_I, V_N) VALUES (?, ?, ?, ?, ?, ?)'

In [20]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_8 = int(line[2]) # quality level of next columns
            v_n_i = line[3].strip()
            v_n = int(line[4])
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_8, timestamp, unixTimestamp, v_n_i, v_n)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [21]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()  

Process file 1 of 6 - stundenwerte_N_01975_19490101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_N_02171_19520901_20211231_hist.zip
Process file 3 of 6 - stundenwerte_N_02261_19490101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_N_02315_19911101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_N_02437_19790101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_N_02483_19541207_20211231_hist.zip


## Recent data

In [22]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()   

Process file 1 of 6 - stundenwerte_N_01975_akt.zip
Process file 2 of 6 - stundenwerte_N_02171_akt.zip
Process file 3 of 6 - stundenwerte_N_02261_akt.zip
Process file 4 of 6 - stundenwerte_N_02315_akt.zip
Process file 5 of 6 - stundenwerte_N_02437_akt.zip
Process file 6 of 6 - stundenwerte_N_02483_akt.zip


---

# Dew point

### Station observations of air and dew point temperature 2 m above ground in °C for Germany

## General settings/parameters

In [23]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/dew_point/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/dew_point/recent'
dataFileFilter = 'produkt_td_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO DewPoint(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, TT, TD) VALUES (?, ?, ?, ?, ?, ?)'

In [24]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_8 = int(line[2]) # quality level of next columns
            tt = float(line[3])
            td = float(line[4])
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_8, timestamp, unixTimestamp, tt, td)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [25]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()  

Process file 1 of 6 - stundenwerte_TD_01975_19490101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_TD_02171_19520901_20211231_hist.zip
Process file 3 of 6 - stundenwerte_TD_02261_19490101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_TD_02315_19911101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_TD_02437_19790101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_TD_02483_19541207_20211231_hist.zip


## Recent data

In [26]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()    

Process file 1 of 6 - stundenwerte_TD_01975_akt.zip
Process file 2 of 6 - stundenwerte_TD_02171_akt.zip
Process file 3 of 6 - stundenwerte_TD_02261_akt.zip
Process file 4 of 6 - stundenwerte_TD_02315_akt.zip
Process file 5 of 6 - stundenwerte_TD_02437_akt.zip
Process file 6 of 6 - stundenwerte_TD_02483_akt.zip


---

# Moisture

## General settings/parameters

In [27]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/moisture/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/moisture/recent'
dataFileFilter = 'produkt_tf_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Moisture(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, ABSF_STD, VP_STD, TF_STD, P_STD, TT_STD, RF_STD, TD_STD) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'

In [28]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 11):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_8 = int(line[2]) # quality level of next columns
            absf_td = float(line[3])
            vp_std = float(line[4])
            tf_std = float(line[5])
            p_std = float(line[6])
            tt_std = float(line[7])
            rf_std = float(line[8])
            td_std = float(line[9])
            eor = line[10] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_8, timestamp, unixTimestamp, absf_td, vp_std, tf_std, p_std, tt_std, rf_std, td_std)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [29]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()  

Process file 1 of 6 - stundenwerte_TF_01975_19490101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_TF_02171_19520901_20211231_hist.zip
Process file 3 of 6 - stundenwerte_TF_02261_19490101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_TF_02315_19911101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_TF_02437_19790101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_TF_02483_19541207_20211231_hist.zip


## Recent data

In [30]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()   

Process file 1 of 6 - stundenwerte_TF_01975_akt.zip
Process file 2 of 6 - stundenwerte_TF_02171_akt.zip
Process file 3 of 6 - stundenwerte_TF_02261_akt.zip
Process file 4 of 6 - stundenwerte_TF_02315_akt.zip
Process file 5 of 6 - stundenwerte_TF_02437_akt.zip
Process file 6 of 6 - stundenwerte_TF_02483_akt.zip


---

# Precipitation

### Station observations of precipitation for Germany

## General settings/parameters

In [31]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/precipitation/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent'
dataFileFilter = 'produkt_rr_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Precipitation(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, R1, RS_IND) VALUES (?, ?, ?, ?, ?, ?)'

In [32]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 7):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_8 = int(line[2]) # quality level of next columns
            r1 = float(line[3])
            rs_ind = float(line[4])
            # wrtr will be irgnored here
            eor = line[6] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_8, timestamp, unixTimestamp, r1, rs_ind)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [33]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 4 - stundenwerte_RR_01975_19950905_20211231_hist.zip
Process file 2 of 4 - stundenwerte_RR_02171_19950901_20211231_hist.zip
Process file 3 of 4 - stundenwerte_RR_02261_19991202_20211231_hist.zip
Process file 4 of 4 - stundenwerte_RR_02483_19951012_20211231_hist.zip


## Recent data

In [34]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()   

Process file 1 of 4 - stundenwerte_RR_01975_akt.zip
Process file 2 of 4 - stundenwerte_RR_02171_akt.zip
Process file 3 of 4 - stundenwerte_RR_02261_akt.zip
Process file 4 of 4 - stundenwerte_RR_02483_akt.zip


---

# Pressure

### Station observations of pressure for Germany

## General settings/parameters

In [35]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/pressure/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/pressure/recent'
dataFileFilter = 'produkt_p0_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Pressure(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, P, P0) VALUES (?, ?, ?, ?, ?, ?)'


In [36]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_8 = int(line[2]) # quality level of next columns
            p = float(line[3])
            p0 = float(line[4])
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_8, timestamp, unixTimestamp, p, p0)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [37]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 6 - stundenwerte_P0_01975_19490101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_P0_02171_19520901_20211231_hist.zip
Process file 3 of 6 - stundenwerte_P0_02261_19490101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_P0_02315_19911101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_P0_02437_19790101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_P0_02483_19541207_20211231_hist.zip


## Recent data

In [38]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit() 

Process file 1 of 6 - stundenwerte_P0_01975_akt.zip
Process file 2 of 6 - stundenwerte_P0_02171_akt.zip
Process file 3 of 6 - stundenwerte_P0_02261_akt.zip
Process file 4 of 6 - stundenwerte_P0_02315_akt.zip
Process file 5 of 6 - stundenwerte_P0_02437_akt.zip
Process file 6 of 6 - stundenwerte_P0_02483_akt.zip


---

# Soil Temperature

### Station observations of soil temperature station data for Germany

## General settings/parameters

In [39]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/soil_temperature/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/soil_temperature/recent'
dataFileFilter = 'produkt_eb_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO SoilTemperature(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, V_TE002, V_TE005, V_TE010, V_TE020, V_TE050, V_TE100) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'

In [40]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 10):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_2 = int(line[2]) # quality level of next columns
            v_te002 = float(line[3])
            v_te005 = float(line[4])
            v_te010 = float(line[5])
            v_te020 = float(line[6])
            v_te050 = float(line[7])
            v_te100 = float(line[8])
            eor = line[9] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_2, timestamp, unixTimestamp, v_te002, v_te005, v_te010, v_te020, v_te050, v_te100)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [41]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 6 - stundenwerte_EB_01975_19610101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_EB_02171_19810101_20211231_hist.zip
Process file 3 of 6 - stundenwerte_EB_02261_19510101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_EB_02315_20010403_20211231_hist.zip
Process file 5 of 6 - stundenwerte_EB_02437_19920101_20211231_hist.zip
Process file 6 of 6 - stundenwerte_EB_02483_19810101_20211231_hist.zip


## Recent data

In [42]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit() 

Process file 1 of 6 - stundenwerte_EB_01975_akt.zip
Process file 2 of 6 - stundenwerte_EB_02171_akt.zip
Process file 3 of 6 - stundenwerte_EB_02261_akt.zip
Process file 4 of 6 - stundenwerte_EB_02315_akt.zip
Process file 5 of 6 - stundenwerte_EB_02437_akt.zip
Process file 6 of 6 - stundenwerte_EB_02483_akt.zip


---

# Sun

### Station observations of sunshine duration for Germany

## General settings/parameters

In [43]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/sun/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/sun/recent'
dataFileFilter = 'produkt_sd_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Sun(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, SD_SO) VALUES (?, ?, ?, ?, ?)'

In [44]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 5):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_7 = int(line[2]) # quality level of next columns
            sd_so = float(line[3])
            eor = line[4] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_7, timestamp, unixTimestamp, sd_so)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [45]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 5 - stundenwerte_SD_01975_19490101_20211231_hist.zip
Process file 2 of 5 - stundenwerte_SD_02171_19510101_20211231_hist.zip
Process file 3 of 5 - stundenwerte_SD_02261_19510101_20211231_hist.zip
Process file 4 of 5 - stundenwerte_SD_02437_19880301_20061231_hist.zip
Process file 5 of 5 - stundenwerte_SD_02483_19610101_20211231_hist.zip


## Recent data

In [46]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit() 

Process file 1 of 4 - stundenwerte_SD_01975_akt.zip
Process file 2 of 4 - stundenwerte_SD_02171_akt.zip
Process file 3 of 4 - stundenwerte_SD_02261_akt.zip
Process file 4 of 4 - stundenwerte_SD_02483_akt.zip


---

# Weather Phenomena

## General settings/parameters

In [47]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/weather_phenomena/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/weather_phenomena/recent'
dataFileFilter = 'produkt_ww_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO WeatherPhenomenaData(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, WW) VALUES (?, ?, ?, ?, ?)'

In [48]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_7 = int(line[2]) # quality level of next columns
            ww = int(line[3])
            #ww_text will be ignored
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_7, timestamp, unixTimestamp, ww)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [49]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine, utf8=False)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 4 - stundenwerte_WW_01975_19950905_20211231_hist.zip
Process file 2 of 4 - stundenwerte_WW_02171_19950901_20211231_hist.zip
Process file 3 of 4 - stundenwerte_WW_02261_19991202_20211231_hist.zip
Process file 4 of 4 - stundenwerte_WW_02483_19951012_20211231_hist.zip


## Recent data

In [50]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine, utf8=False)

    # save changes in local sqlite database
    db.commit() 

Process file 1 of 4 - stundenwerte_WW_01975_akt.zip
Process file 2 of 4 - stundenwerte_WW_02171_akt.zip
Process file 3 of 4 - stundenwerte_WW_02261_akt.zip
Process file 4 of 4 - stundenwerte_WW_02483_akt.zip


---

# Wind

### Station observations of wind speed and wind direction for Germany

## General settings/parameters

In [51]:
pathHistorical = '/climate_environment/CDC/observations_germany/climate/hourly/wind/historical'
pathRecent = '/climate_environment/CDC/observations_germany/climate/hourly/wind/recent'
dataFileFilter = 'produkt_ff_stunde*.txt'
sqlCommand = 'INSERT OR IGNORE INTO Wind(StationIdent, QLIdent, DateMeasuredUTC, DateMeasuredUnixUTC, FF, DD) VALUES (?, ?, ?, ?, ?, ?)'

In [52]:
def ProcessFileLine(line, offsetYear):
    # process and insert line into database
    try:
        # access the parameters of each line
        if (len(line) == 6):
            stationID = int(line[0]) # station identification number
            timestamp = CreateDateTime(line[1]) # measurement time
            qn_3 = int(line[2]) # quality level of next columns
            ff = float(line[3])
            dd = int(line[4])
            eor = line[5] # end of record

            if (int(timestamp.year) >= offsetYear):
                # create unix timestamp
                unixTimestamp = int(timestamp.timestamp())

                # save line in database
                return (stationID, qn_3, timestamp, unixTimestamp, ff, dd)
            else:
                # return nothing in case of an error
                return None
    except:
        print('Error processing line:', line)
        return None

## Historical data

In [53]:
if (IncludeHistoricalData):
    # move to ftp directory and retrieve files
    ftp.cwd(pathHistorical)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit()

Process file 1 of 6 - stundenwerte_FF_01975_19500101_20211231_hist.zip
Process file 2 of 6 - stundenwerte_FF_02171_19760401_20211231_hist.zip
Process file 3 of 6 - stundenwerte_FF_02261_19480101_20211231_hist.zip
Process file 4 of 6 - stundenwerte_FF_02315_20090101_20211231_hist.zip
Process file 5 of 6 - stundenwerte_FF_02437_19710101_20080131_hist.zip
Process file 6 of 6 - stundenwerte_FF_02483_19690101_20211231_hist.zip


## Recent data

In [54]:
if (IncludeRecentData):
    # move to ftp directory
    ftp.cwd(pathRecent)
    files = ftp.nlst()

    # filter files
    filteredFiles = FilterFileList(stations, files)

    # process ftp server files
    ProcessFTPData(filteredFiles, dataFileFilter, recordStartYear, sqlCommand, workDirectory, db, ProcessFileLine)

    # save changes in local sqlite database
    db.commit() 

Process file 1 of 5 - stundenwerte_FF_01975_akt.zip
Process file 2 of 5 - stundenwerte_FF_02171_akt.zip
Process file 3 of 5 - stundenwerte_FF_02261_akt.zip
Process file 4 of 5 - stundenwerte_FF_02315_akt.zip
Process file 5 of 5 - stundenwerte_FF_02483_akt.zip


--- 

# Clean up

In [55]:
# close ftp connection
ftp.quit()

# close db connection
db.commit()
db.close()