# 1. Эксперимент #

## Блок нейтронной физики ##

In [1]:
import pandas as pd
import random as rd
import math as math
import scipy.constants as const # Модуль физико-математических констант

# отключим предупреждения Anaconda
import warnings
warnings.simplefilter('ignore')

N_A = const.Avogadro

### Функция жизни нейтрона: ###

In [2]:
# Для одинаковой генерации данных
rd.seed(42)

def life_of_neutron(Ein, Eout, dens, A1, A2, Mol_m, Sigma, alpha_1, alpha_2, sigma_1, sigma_2, x1, x2):
    
    Energy = Ein
    colision = 0 # Число столкновений
    time = 0 # Время полета
    a = 0 # Угол полета
    b = 0 # Изменение угла полета
    l = 0 # Длина пролета
    V = 0 # Скорость
    elder = 0 #Возраст
    while Energy > Eout: #Пока энергия нейтрона лежит в области > ... ЭВ
        l = -1*math.log(rd.random())/Sigma/100 # В метрах
        V = (2*Energy*1.6*(10**(-19))/(1.6749*10**(-27)))**0.5 # В м/с
        time += l/V
        colision += 1
        
        #Разыгрывание на каком ядре произошло рассеяние
        x = rd.random()
        if x < (x1*sigma_1/(x1*sigma_1 + x2*sigma_2)):# Рассеяние на первом ядре
            E1 = Energy
            E2 = Energy*alpha_1
            Energy = rd.uniform(E1, E2)
            b = math.acos

            
        else:# Рассеяние на втором ядре
            E1 = Energy
            E2 = Energy*alpha_2
            Energy = rd.uniform(E1, E2)

    return colision, time

### Входные данные по материалам: ###

In [3]:
df = pd.DataFrame({'Material': ['Water', 'Heavy Plexiglass', 'Heavy Water'],
                   'Density g/cm^3': [1.0, 1.6, 1.1],
                    'A_1' : [1, 12, 2],
                    'A_2' : [16, 2, 16],
                    'Molar mass g/mol' : [18, 28, 20],
                    'Chemical formula' : ['H2O', 'C2D2', 'D2O'],
                    'sigma_1 b' : [30.0, 4.9, 3.5],
                    'sigma_2 b' : [4.0, 3.5, 4.0],
                    'x1' : [2, 2, 2],
                    'x2' : [1, 2, 1],
                    
              })

df

Unnamed: 0,Material,Density g/cm^3,A_1,A_2,Molar mass g/mol,Chemical formula,sigma_1 b,sigma_2 b,x1,x2
0,Water,1.0,1,16,18,H2O,30.0,4.0,2,1
1,Heavy Plexiglass,1.6,12,2,28,C2D2,4.9,3.5,2,2
2,Heavy Water,1.1,2,16,20,D2O,3.5,4.0,2,1


### Расчет макросечения: ###

In [4]:
df['\u03A3, 1/cm'] = 0.0
for k in df.index:
    df['Σ, 1/cm'][k] = (df['x1'][k]*df['sigma_1 b'][k]+df['x2'][k]*df['sigma_2 b'][k]
                       )*N_A*df['Density g/cm^3'][k]/df['Molar mass g/mol'][k]/10**(24)
display(df)

Unnamed: 0,Material,Density g/cm^3,A_1,A_2,Molar mass g/mol,Chemical formula,sigma_1 b,sigma_2 b,x1,x2,"Σ, 1/cm"
0,Water,1.0,1,16,18,H2O,30.0,4.0,2,1,2.141206
1,Heavy Plexiglass,1.6,12,2,28,C2D2,4.9,3.5,2,2,0.578126
2,Heavy Water,1.1,2,16,20,D2O,3.5,4.0,2,1,0.36434


### Расчет ступеньки замедления: ###

In [5]:
df['\u0251_1'] = 0.0
df['\u0251_2'] = 0.0
for k in df.index:
    df['\u0251_1'][k] = ((df['A_1'][k]-1)/(df['A_1'][k]+1))**2
    df['\u0251_2'][k] = ((df['A_2'][k]-1)/(df['A_2'][k]+1))**2
display(df)

Unnamed: 0,Material,Density g/cm^3,A_1,A_2,Molar mass g/mol,Chemical formula,sigma_1 b,sigma_2 b,x1,x2,"Σ, 1/cm",ɑ_1,ɑ_2
0,Water,1.0,1,16,18,H2O,30.0,4.0,2,1,2.141206,0.0,0.778547
1,Heavy Plexiglass,1.6,12,2,28,C2D2,4.9,3.5,2,2,0.578126,0.715976,0.111111
2,Heavy Water,1.1,2,16,20,D2O,3.5,4.0,2,1,0.36434,0.111111,0.778547


### Создание DataFrame для записи данных ###

In [6]:
df1 = pd.DataFrame({'Time': [],
                   'Colisions': [],
                    'Material' : [],
                    'Target' : [] 
              })
df1

Unnamed: 0,Time,Colisions,Material,Target


### Эксперимент методом Монте-Карло ###

In [7]:
%%time
Estart = 1_000_000 #1Мэв
Eend = 1 #1 Эв
number_of_event = 4_000
counter = 0 # Счетчик для записи в DataFrame

for k in df.index: #Цикл по материалам
    for i in range(0, number_of_event*(k+1)): #Колличество событий для каждого материала
        colision, time = life_of_neutron(Estart, Eend, df['Density g/cm^3'][k],
                                          df['A_1'][k], df['A_2'][k],
                                          df['Molar mass g/mol'][k], df['Σ, 1/cm'][k],
                                          df['ɑ_1'][k], df['ɑ_2'][k],
                                          df['sigma_1 b'][k], df['sigma_2 b'][k],
                                          df['x1'][k], df['x2'][k],)
        df1.loc[counter] = {'Material': df['Material'][k], 'Time': time, 'Colisions': colision, 'Target': k}
        counter+=1

Wall time: 1min 13s


In [8]:
display(df1)

Unnamed: 0,Time,Colisions,Material,Target
0,6.013426e-07,18.0,Water,0.0
1,1.526499e-07,11.0,Water,0.0
2,5.879935e-07,17.0,Water,0.0
3,4.702673e-07,14.0,Water,0.0
4,1.814087e-07,10.0,Water,0.0
...,...,...,...,...
23995,1.154094e-05,21.0,Heavy Water,2.0
23996,2.636944e-06,23.0,Heavy Water,2.0
23997,8.083488e-06,29.0,Heavy Water,2.0
23998,1.496023e-05,38.0,Heavy Water,2.0


# 2. PySpark vs only pyodbc

## 2.1 Подключение к данным, CREATE TABLE, INSERT, выгрузка данных pyodbc

### 2.1.1 DF to Data Base MS SQL Server медленный способ

In [9]:
#Для корректной работы необходимо создать базу данных через MS SQL Server Management Studio
# имя базы [neutron_moderation_MS_SQL_Server]

import pyodbc
import pandas as pd

server = 'IVANPC' 
database = 'neutron_moderation_MS_SQL_Server' 
username = 'sa' 
password = 'sasa'

Установка и настройка MS SQL Server и Management Studio

https://www.youtube.com/watch?v=dP_ZmYhNFlg&ab_channel=%D0%92%D0%B8%D0%B4%D0%B5%D0%BE%D1%83%D1%80%D0%BE%D0%BA%D0%B8%D0%BF%D0%BE%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D1%8EALEKSEEV74

In [10]:
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()

In [11]:
#Удалить таблицу если такая уже существует

query = '''
DROP TABLE IF EXISTS Output_data
'''
cursor.execute(query)
cnxn.commit()

In [12]:
#создание таблицы Output_data

query = '''
CREATE TABLE [dbo].[Output_data](
	[Time] [float] NOT NULL,
	[Colisions] [int] NOT NULL,
	[Material] [varchar](50) NOT NULL,
	[Target] [int] NOT NULL
)
'''
cursor.execute(query)
cnxn.commit()

In [13]:
#Показать созданную таблицу

query = "SELECT * FROM Output_data;"
df1_from_MS_SQL_Server = pd.read_sql(query, cnxn)
df1_from_MS_SQL_Server

Unnamed: 0,Time,Colisions,Material,Target


In [14]:
cursor = cnxn.cursor()

In [15]:
%%time
#Запись данных

for index,row in df1.iterrows():
    cursor.execute('''
        INSERT INTO Output_data(
        [Time],[Colisions],[Material],[Target]
        ) 
        values (?,?,?,?)''',
                   row['Time'], 
                   row['Colisions'],
                   row['Material'],
                   row['Target']
                  ) 
cnxn.commit()

Wall time: 3.68 s


### 2.1.2 DF to Data Base MS SQL Server быстрый способ

In [16]:
#Удалить таблицу если такая уже существует

query = '''
DROP TABLE IF EXISTS Output_data
'''
cursor.execute(query)
cnxn.commit()

In [17]:
#создание таблицы Output_data

query = '''
CREATE TABLE [dbo].[Output_data](
	[Time] [float] NOT NULL,
	[Colisions] [int] NOT NULL,
	[Material] [varchar](50) NOT NULL,
	[Target] [int] NOT NULL
)
'''

In [18]:
cursor.execute(query)
cnxn.commit()

In [19]:
insert_to_tmp_tbl_stmt = f"INSERT INTO Output_data VALUES (?,?,?,?)"
cursor.fast_executemany = True

In [20]:
%%time
#Запись данных

cursor.executemany(insert_to_tmp_tbl_stmt, df1.values.tolist())
cursor.commit()

Wall time: 359 ms


### 2.1.2 Data Base MS SQL Server to DF

In [21]:
%%time
# Выгрузка данных

query = "SELECT * FROM Output_data;"
df1_from_MS_SQL_Server = pd.read_sql(query, cnxn)
df1_from_MS_SQL_Server

Wall time: 63.8 ms


Unnamed: 0,Time,Colisions,Material,Target
0,6.013426e-07,18,Water,0
1,1.526499e-07,11,Water,0
2,5.879935e-07,17,Water,0
3,4.702673e-07,14,Water,0
4,1.814087e-07,10,Water,0
...,...,...,...,...
23995,1.154094e-05,21,Heavy Water,2
23996,2.636944e-06,23,Heavy Water,2
23997,8.083488e-06,29,Heavy Water,2
23998,1.496023e-05,38,Heavy Water,2


In [22]:
cursor.close()
cnxn.close()

## 2.2 Подключение к данным, INSERT, выгрузка данных PySpark

### 2.2.1 PySpark dataframe

In [23]:
import findspark
findspark.init()

In [24]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

# Enable Arrow-based spark configuration
spark = SparkSession.builder.getOrCreate()

# Create a Spark DataFrame from a pandas DataFrame using Arrow
spark_df = spark.createDataFrame(df1)

In [25]:
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [26]:
spark_df.printSchema()

root
 |-- Time: double (nullable = true)
 |-- Colisions: double (nullable = true)
 |-- Material: string (nullable = true)
 |-- Target: double (nullable = true)



In [27]:
spark_df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[Time#0,Colisions#1,Material#2,Target#3]




In [28]:
spark_df.describe()

DataFrame[summary: string, Time: string, Colisions: string, Material: string, Target: string]

In [29]:
spark_df.show(3)

+--------------------+---------+--------+------+
|                Time|Colisions|Material|Target|
+--------------------+---------+--------+------+
|6.013426303386577E-7|     18.0|   Water|   0.0|
|1.526499321819623E-7|     11.0|   Water|   0.0|
|5.879934878707969E-7|     17.0|   Water|   0.0|
+--------------------+---------+--------+------+
only showing top 3 rows



In [30]:
spark_df.summary()

DataFrame[summary: string, Time: string, Colisions: string, Material: string, Target: string]

In [31]:
spark_df.head(3)

[Row(Time=6.013426303386577e-07, Colisions=18.0, Material='Water', Target=0.0),
 Row(Time=1.526499321819623e-07, Colisions=11.0, Material='Water', Target=0.0),
 Row(Time=5.879934878707969e-07, Colisions=17.0, Material='Water', Target=0.0)]

In [32]:
spark_df.tail(3)

[Row(Time=8.083487645839396e-06, Colisions=29.0, Material='Heavy Water', Target=2.0),
 Row(Time=1.4960234099482274e-05, Colisions=38.0, Material='Heavy Water', Target=2.0),
 Row(Time=1.095073077721204e-05, Colisions=25.0, Material='Heavy Water', Target=2.0)]

In [33]:
spark_df.registerTempTable("df")
spark.sql("select * from df where Colisions = 20").show(3)

+--------------------+---------+--------+------+
|                Time|Colisions|Material|Target|
+--------------------+---------+--------+------+
|5.209425934209706E-7|     20.0|   Water|   0.0|
| 1.50359944963643E-6|     20.0|   Water|   0.0|
|1.134677788023007...|     20.0|   Water|   0.0|
+--------------------+---------+--------+------+
only showing top 3 rows



In [34]:
spark.stop()

### 2.2.2 DF PySpark from MS SQL Server

In [35]:
import findspark
findspark.init()

In [36]:
from pyspark import SparkContext, SparkConf, SQLContext
import pyodbc
import pandas as pd

appName = "PySpark SQL Server Example - via ODBC"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) 
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [37]:
server = 'IVANPC' 
database = 'neutron_moderation_MS_SQL_Server' 
username = 'sa' 
password = 'sasa'
table = "dbo.Output_data"
conn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)

query = f"SELECT TOP(3) * FROM {table}"
pdf = pd.read_sql(query, conn)
sparkDF =  spark.createDataFrame(pdf)
sparkDF.show()

+--------------------+---------+--------+------+
|                Time|Colisions|Material|Target|
+--------------------+---------+--------+------+
|6.013426303386577E-7|       18|   Water|     0|
|1.526499321819623E-7|       11|   Water|     0|
|5.879934878707969E-7|       17|   Water|     0|
+--------------------+---------+--------+------+



In [38]:
spark.stop()

### 2.2.3 DF PySpark to MS SQL Server

In [39]:
from pyspark import SparkContext, SparkConf, SQLContext
import pyodbc
import pandas as pd

appName = "PySpark SQL Server Example - via ODBC"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) 
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession