# Задание

# Работа с источниками данных в Apache Spark
Изучите теоретическую часть. 

Ячейки ниже скачивают и настраивают две СУБД. 

Задача состоит в том, что бы создать в postgresql таблицы приёмники (target table) и заполнить данные в этих таблицах из таблиц источников (source table) в sqlite.

**Подсказки:** 
1. При создании таблиц вместо [типа](https://www.sqlite.org/datatype3.html) `NVARCHAR` используйте [тип](https://postgrespro.ru/docs/postgrespro/10/datatype) `VARCHAR`, а вместо типа `DATETIME` используйте тип `VARCHAR(100)`.
2. Если вам потребуется преобразовывать типы данных в sqlite это делается вот [так](https://www.sqlite.org/lang_expr.html#castexpr).
3. Для вывода списка таблиц предлагается использовать для [sqlite](https://www.sqlite.org/schematab.html) и для [postgresql](https://www.postgresql.org/docs/8.0/view-pg-tables.html)
4. В sqlite вы можете воспользоваться конструкцией `SELECT * FROM pragma_table_info(table_name)` для вывода типов и названий атрибутов таблицы




In [1]:
# Устанавливаем OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Закачиваем Spark
!wget -q http://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz -O spark.tgz
# Распаковываем архив со Spark
!tar xf spark.tgz
# Устанавливаем пакет findspark для работы со Spark из Python
!pip install -q findspark
# Настраиваем переменные окружения для работы с Apache Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"
# Настраиваем драйверы БД для работы Apache Spark с sqllite
!wget https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar -q
!wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.23/postgresql-42.2.23.jar -q
!echo 'spark.jars /content/sqlite-jdbc-3.34.0.jar,/content/postgresql-42.2.23.jar' >> '/content/spark-3.2.0-bin-hadoop2.7/conf/spark-defaults.conf'
# Находим установку Spark
import findspark
findspark.init()
# Подключаем необходимые модули для работы со Spark из Python
from pyspark.sql import SparkSession
# Создаем сессию Spark на локальном компьютере
spark = SparkSession.builder.master("local[*]").getOrCreate()
!wget https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip -O 'sample_data/chinook.zip' -q
!unzip "sample_data/chinook.zip" -d "sample_data/"

Archive:  sample_data/chinook.zip
  inflating: sample_data/chinook.db  


**Создадим базу данных, схему БД**

In [2]:
!apt-get update &>log0 && apt install postgresql postgresql-contrib &>log1
!service postgresql start
!sudo -u postgres psql -c 'CREATE DATABASE mydatabase'
!sudo -u postgres psql -c "CREATE USER me WITH ENCRYPTED PASSWORD 'mypass'"
!sudo -u postgres psql -c 'CREATE SCHEMA AUTHORIZATION "me";'
!sudo -u postgres psql -c 'GRANT ALL PRIVILEGES ON DATABASE mydatabase TO me'

 * Starting PostgreSQL 10 database server
   ...done.
CREATE DATABASE
CREATE ROLE
CREATE SCHEMA
GRANT


## Задача 1

Модернизируйте заготовку заменив все участки ```<put your code here>``` на ваш код для того, что бы:
* в mydatabase должны создасться 11 таблиц
* вывод проверочного скрипта был примерно такой:

```
public.genres
Rows count: 25
public.playlist_track
Rows count: 8715
public.invoices
Rows count: 412
public.tracks
Rows count: 3503
public.albums
Rows count: 347
public.invoice_items
Rows count: 2240
public.artists
Rows count: 275
public.media_types
Rows count: 5
public.customers
Rows count: 59
public.employees
Rows count: 8
public.playlists
Rows count: 18
```



**Выполним задание**

In [3]:
from sqlalchemy import create_engine

sqlite = create_engine("sqlite:///sample_data/chinook.db") #движок для подключений к sqlite
postgresql = create_engine("postgresql://me:mypass@localhost/mydatabase") #движок для подключений к postgresql
jdbcDriverLite = 'org.sqlite.JDBC'
jdbcUrlLite = 'jdbc:sqlite:sample_data/chinook.db'
jdbcUrlPost = 'jdbc:postgresql://localhost/mydatabase?user=me&password=mypass'
jdbcDriverPost= 'org.postgresql.Driver'

with sqlite.connect() as lite, lite.begin(), postgresql.connect() as post, post.begin(): #контекст соединения
  cursor = lite.execute('''SELECT name, sql FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';''')
  for row in cursor:
    tab = row['name'] 
    fields = [] 
    cols = []
    innerCursor = lite.execute(f'''SELECT name, type FROM pragma_table_info('{tab}') c''')
    for innerRow in innerCursor:
      fields.append(innerRow['name']+ ' ' + str(innerRow['type']).replace('NVARCHAR', 'VARCHAR').replace('DATETIME','VARCHAR(100)'))    
      cols.append('cast(' + innerRow['name'] + ' as text)' if innerRow['type'] == 'DATETIME' else innerRow['name'])
    ddl = f"CREATE TABLE IF NOT EXISTS {tab} (\n" + ",\n".join(fields) + ");"
    postgresql.execute(ddl)
    print('(select ' + ", ".join(cols) + f' from {tab}) as {tab}')
    df = spark.read.format('jdbc').options(driver=jdbcDriverLite, dbtable='(select ' + ", ".join(cols) + f' from {tab}) as {tab}', url=jdbcUrlLite).load()
    #df.show()
    df.write.format("jdbc").option("url", jdbcUrlPost).option("driver",jdbcDriverPost).option("dbtable", tab).mode("overwrite").save()

(select AlbumId, Title, ArtistId from albums) as albums
(select ArtistId, Name from artists) as artists
(select CustomerId, FirstName, LastName, Company, Address, City, State, Country, PostalCode, Phone, Fax, Email, SupportRepId from customers) as customers
(select EmployeeId, LastName, FirstName, Title, ReportsTo, cast(BirthDate as text), cast(HireDate as text), Address, City, State, Country, PostalCode, Phone, Fax, Email from employees) as employees
(select GenreId, Name from genres) as genres
(select InvoiceId, CustomerId, cast(InvoiceDate as text), BillingAddress, BillingCity, BillingState, BillingCountry, BillingPostalCode, Total from invoices) as invoices
(select InvoiceLineId, InvoiceId, TrackId, UnitPrice, Quantity from invoice_items) as invoice_items
(select MediaTypeId, Name from media_types) as media_types
(select PlaylistId, Name from playlists) as playlists
(select PlaylistId, TrackId from playlist_track) as playlist_track
(select TrackId, Name, AlbumId, MediaTypeId, Genre

**Проверочный скрипт:** 

In [4]:
from sqlalchemy import create_engine

postgresql = create_engine("postgresql://me:mypass@localhost/mydatabase")

with postgresql.connect() as post, post.begin():
  cursor = post.execute('''SELECT * FROM pg_catalog.pg_tables where schemaname = 'public';''')
  for row in cursor:
    print(row['schemaname'] + '.' + row['tablename'])
    innerCursor = post.execute(f'''SELECT count(*) cnt FROM {row['schemaname']}.{row['tablename']};''')
    for innerRow in innerCursor:
      print(f'''Rows count: {innerRow['cnt']}''')      

public.invoices
Rows count: 412
public.tracks
Rows count: 3503
public.albums
Rows count: 347
public.invoice_items
Rows count: 2240
public.artists
Rows count: 275
public.media_types
Rows count: 5
public.customers
Rows count: 59
public.playlists
Rows count: 18
public.employees
Rows count: 8
public.genres
Rows count: 25
public.playlist_track
Rows count: 8715
