In [1]:
import os
import psycopg2
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from glob import glob

### Класс для обработки файлов

In [2]:
class OSM:
    
    def process_txt(self, spark, prefix, filename, parquet_name):
        # read file to rdd
        rdd = spark.sparkContext.textFile("file:///" + prefix + filename)
        # get headers
        initial_header = rdd.take(1)[0]
        final_header = initial_header
        final_header = final_header.split(";")
        final_header = [final_header[5], "PAR_OSM_ID", final_header[0], final_header[3]]  
        # remove headers from rdd
        rdd_no_header = rdd.filter(lambda l: l != initial_header)
        # process rdd lines
        rdd_processed = rdd_no_header.map(lambda l: self.process_rdd_row(l, initial_header))
        # generate dataframe
        df = self.generate_df(rdd_processed, final_header)
        # append df to the final parquet
        self.write_df_to_parquet(df, parquet_name)
        
    def process_rdd_row(self, line, header):
        cols = header.split(";")
        vals = line.split(";")
        ADMIN_LVL = int(vals[cols.index("ADMIN_LVL")])
        i = 1
        while True:
            if ADMIN_LVL == i:
                vals.append("0")
                break
            if vals[cols.index(f"ADMIN_L{ADMIN_LVL - i}D")] in ["None", ""]:
                i += 1
            else:
                vals.append(str(int(float(vals[cols.index(f"ADMIN_L{ADMIN_LVL - i}D")]))))
                break
                
        return ";".join([vals[5], vals[-1], vals[0], vals[3]])
    
    def generate_df_row(self, vals, cols):
        d = {}
        for col, val in zip(cols, vals.split(";")):
            d[col] = val
            
        return d
    
    def generate_df(self, rdd, header):
        return rdd.map(lambda l: Row(**self.generate_df_row(l, header))).toDF()
    
    def remove_old_parquet(self, parquet_name):
        try:
            shutil.rmtree(parquet_name)
        except:
            pass
    
    def write_df_to_parquet(self, df, parquet_name):
        df.write.mode("append").parquet(parquet_name)

### Класс для работы с PostgreSQL

In [3]:
class OSMDB:
    
    def __init__(self):
        self.conn = psycopg2.connect(
           database="osmdb", user='postgres', password='password', host='127.0.0.1', port='5432'
        )
        self.cursor = self.conn.cursor()
        self.create_table()

    
    def execute_and_commit(self, sql):
        try:
           # Executing the SQL command
           self.cursor.execute(sql)

           # Commit your changes in the database
           self.conn.commit()

        except Exception as e:
            print(e.message)
           # Rolling back in case of error
            self.conn.rollback()
    
    
    def create_table(self):
        sql ='''CREATE TABLE IF NOT EXISTS OSM(
           OSM_ID VARCHAR(20) PRIMARY KEY,
           PAR_OSM_ID VARCHAR(20),
           NAME VARCHAR(255),
           ADMIN_LVL VARCHAR(5)
        )'''
        
        self.execute_and_commit(sql)
        
        
    def insert_data(self, data):
        for row in data:
            row_dict = row.asDict()
            sql = "INSERT INTO OSM ("
            keys = list(row_dict.keys())
            for i in range(len(keys)):
                sql += keys[i]
                if i < len(row_dict) - 1:
                    sql += ","
            sql += ") VALUES ("
            vals = list(row_dict.values())
            for i in range(len(vals)):
                sql += "'" + vals[i] + "'"
                if i < len(row_dict) - 1:
                    sql += ","
            sql += ") ON CONFLICT DO NOTHING"
            self.execute_and_commit(sql)
            
    
    def read_data(self):
        sql = "SELECT * FROM OSM ORDER BY ADMIN_LVL"
        self.execute_and_commit(sql)
        result = self.cursor.fetchall()
        print(f"Selected {len(result)} rows\n\nFirst 10 rows:")
        for i in result[:10]:
            print(i)
        print("\nLast 10 rows:")
        for i in result[-10:]:
            print(i)
    
    
    def close(self):
        self.cursor.close()
        self.conn.close()

### Считывание файлов и генерация parquet

In [4]:
prefix = os.path.abspath(".")

In [5]:
spark = SparkSession.builder \
            .master("local[*]") \
            .appName('OSM') \
            .getOrCreate()

In [6]:
to_process = sorted(glob("./data/*.txt"))
to_process

['./data/admin_lev2.txt',
 './data/admin_lev3.txt',
 './data/admin_lev4.txt',
 './data/admin_lev5.txt',
 './data/admin_lev6.txt',
 './data/admin_lev8.txt',
 './data/admin_lev9.txt']

In [7]:
osm = OSM()
osmdb = OSMDB()
parquet_name = "processed-osm.parquet"

In [8]:
osm.remove_old_parquet(parquet_name)
for i in to_process:
    osm.process_txt(spark, prefix, i[1:], parquet_name)

### Считывание parquet и запись в БД

In [9]:
df = spark.read.parquet("processed-osm.parquet")
df.head(10)

[Row(OSM_ID='5730494', PAR_OSM_ID='1445620', NAME='Мамское городское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5728524', PAR_OSM_ID='1464422', NAME='Ключи-Булакское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5730491', PAR_OSM_ID='1445620', NAME='Витимское городское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5737571', PAR_OSM_ID='190110', NAME='Костинское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5728538', PAR_OSM_ID='1464422', NAME='Тэмьское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5738214', PAR_OSM_ID='190110', NAME='Солонецкое сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5742481', PAR_OSM_ID='190098', NAME='Бирюсинское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5728525', PAR_OSM_ID='1464422', NAME='Кобинское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5728533', PAR_OSM_ID='1464422', NAME='Прибрежнинское сельское поселение', ADMIN_LVL='8'),
 Row(OSM_ID='5728516', PAR_OSM_ID='1464422', NAME='Большеокинское сельское поселение', ADMIN_LVL='8')]

In [10]:
osmdb.insert_data(df.rdd.collect())
osmdb.read_data()

Selected 462 rows

First 10 rows:
('60189', '0', 'Россия', '2')
('1221148', '60189', 'Сибирский федеральный округ', '3')
('145454', '1221148', 'Иркутская область', '4')
('3438290', '145454', 'Усть-Ордынский Бурятский округ', '5')
('1456241', '145454', 'Качугский район', '6')
('1456220', '145454', 'Жигаловский район', '6')
('1454692', '145454', 'Усть-Кутский район', '6')
('1454640', '145454', 'Казачинско-Ленский район', '6')
('1454435', '145454', 'Киренский район', '6')
('1456787', '3438290', 'Боханский район', '6')

Last 10 rows:
('5722706', '1454435', 'Небельское сельское поселение', '8')
('5722705', '1454435', 'Макаровское сельское поселение', '8')
('5722704', '1454435', 'Криволукское сельское поселение', '8')
('5722703', '1454435', 'Коршуновское сельское поселение', '8')
('5728528', '1464422', 'Кузнецовское сельское поселение', '8')
('5725774', '1460823', 'Куйтунское городское поселение', '8')
('5826259', '1430613', 'Ленинский административный округ', '9')
('5827226', '1430613', 'Св

In [11]:
df.toPandas().head(5)

Unnamed: 0,OSM_ID,PAR_OSM_ID,NAME,ADMIN_LVL
0,5730494,1445620,Мамское городское поселение,8
1,5728524,1464422,Ключи-Булакское сельское поселение,8
2,5730491,1445620,Витимское городское поселение,8
3,5737571,190110,Костинское сельское поселение,8
4,5728538,1464422,Тэмьское сельское поселение,8


In [12]:
df.toPandas().tail(5)

Unnamed: 0,OSM_ID,PAR_OSM_ID,NAME,ADMIN_LVL
457,5827642,1430613,Октябрьский административный округ,9
458,5826259,1430613,Ленинский административный округ,9
459,5827226,1430613,Свердловский административный округ,9
460,145454,1221148,Иркутская область,4
461,60189,0,Россия,2


### Вывод содердимого parquet

In [13]:
!ls processed-osm.parquet/*.parquet

processed-osm.parquet/part-00000-12289c86-34b6-41ce-992c-62f93e0e8dcf-c000.snappy.parquet
processed-osm.parquet/part-00000-38582742-c9a9-4f1c-bdca-1e5bc5b21e6e-c000.snappy.parquet
processed-osm.parquet/part-00000-3e488b1a-a760-45ac-975b-c059504b207e-c000.snappy.parquet
processed-osm.parquet/part-00000-6834f1bf-40de-45bd-b85c-7d0a5e5186ef-c000.snappy.parquet
processed-osm.parquet/part-00000-b9fab290-4175-4725-9f4b-bc20058b908e-c000.snappy.parquet
processed-osm.parquet/part-00000-db068684-3085-4043-8886-f8cfc77b9a31-c000.snappy.parquet
processed-osm.parquet/part-00000-e365850d-f40e-4bee-aec4-fdefa862c01e-c000.snappy.parquet
processed-osm.parquet/part-00001-12289c86-34b6-41ce-992c-62f93e0e8dcf-c000.snappy.parquet
processed-osm.parquet/part-00001-38582742-c9a9-4f1c-bdca-1e5bc5b21e6e-c000.snappy.parquet
processed-osm.parquet/part-00001-3e488b1a-a760-45ac-975b-c059504b207e-c000.snappy.parquet
processed-osm.parquet/part-00001-6834f1bf-40de-45bd-b85c-7d0a5e5186ef-c000.snappy.parquet