In [1]:
import numpy as np
import random
import pandas as pd
from matplotlib import pyplot as plt
from functools import reduce
%matplotlib inline

In [2]:
cruise_excel_path = 'cruise.xlsx'
probability_excel_path = 'probability.xlsx'


number_of_passengers = 100 #승객 수(<= 호실 수)
dt = 2 #시간 간격(초)
walking_speed = 3/3.6 #3km/h
E, Is, Ia, R = 10, 0, 0, 0 #상태는 S(감염대상군), E(접촉군), Is(감염군-유증상), Ia(감염군-무증상), R(회복군)로 나뉨
S = number_of_passengers - sum([E,Is,Ia,R])
incubation_period, infection_period = round(5.1*86400), round(20.1*86400) #잠복기 5.1일, 치료기간 20.1일
transmissibility = {'Is': 0.063, 'Ia': 0.041}
p_asymptomatic = 0.3 #무증상자 비율
radius = 2 #밀접 접촉에 의한 감염이 일어날 수 있는 반경 2m

### 선실 dataframe 만들기

In [3]:
top_floor = 5 #크루즈가 몇 층짜리인지
xls = pd.ExcelFile(cruise_excel_path) #엑셀 파일을 불러옴
cruise = pd.concat([pd.read_excel(xls, sheet_name=i) for i in range(top_floor)], axis=0) #각 시트에 있는 내용을 다 합침
cruise.reset_index(drop=True, inplace=True)
cruise.decayTime = np.inf #감염자가 방을 떠난 후 경과한 시간

In [5]:
cruise.loc[cruise.floor.values==1]

Unnamed: 0,category,floor,ID,x,y,desc,mask
0,승객X,1,101,180.0,15,조종실,1
1,대형편의,1,102,60.0,15,기념품,1
2,대형편의,1,103,80.0,13,서비스,1
3,대형편의,1,104,80.0,17,게임센터,1
4,소형편의,1,105,101.25,14,화장실,1
5,소형편의,1,106,101.25,16,화장실,1
6,소형편의,1,107,110.75,13,노래방,1
7,소형편의,1,108,110.75,17,ICT실,1
8,대형편의,1,109,125.0,13,고객센터,1
9,대형편의,1,110,125.0,17,응급실,1


### 승객 dataframe 만들기

In [4]:
ID = np.arange(1, number_of_passengers+1) #승객 고유번호
age = np.round(np.random.randn(number_of_passengers)*((72-63)/1.35)+69) #승객 나이는 정규분포를 따른다고 가정(IQR(사분범위)(=72-63), 중앙값(=69)이 주어짐, 정규분포에서 중앙값=평균이고, 표준편차=IQR/1.35)
print(f"탑승객의 수: {number_of_passengers}명, 탑승객의 평균 연령: {np.mean(age)}세")

탑승객의 수: 100명, 탑승객의 평균 연령: 70.1세


In [5]:
sample_rooms = cruise.loc[cruise.category.values=='객실'].sample(n=number_of_passengers) #객실 중에서 승객 수만큼을 무작위로 추출함
home = sample_rooms.ID
floor, x, y = sample_rooms.floor, sample_rooms.x, sample_rooms.y #승객의 현재 층수, x, y좌표는 자기 방
behavior = ['객실'] * number_of_passengers #현재 하고 있는 행동(밤 12시에는 자신의 호실에 있어야 한다)
is_moving = [False] * number_of_passengers #현재 이동중인지 여부(True or False)
time_left = [0] * number_of_passengers #현재 하고 있는 행동이 끝나기까지 남은 시간
state = np.array(['S']*S + ['E']*E + ['Is']*Is + ['Ia']*Ia + ['R']*R) #상태는 S(감염대상군), E(접촉군), Is(감염군-유증상), Ia(감염군-무증상), R(회복군)로 나뉨
latency = np.array([np.inf]*S + [incubation_period]*E + [infection_period]*(Is+Ia) + [np.inf]*R) #E->I, I->R로 넘어가는 데 필요한 시간(기본적으로 S이면 초기값 무한대(np.inf), E이면 초기값은 잠복기)
#latency는 np.inf가 float로 취급되어서 모든 행의 값이 float로 취급되는데, 그래서 부동소수점 연산을 할 때 생기는 오차에 유의해야 함
indices = np.arange(number_of_passengers)
np.random.shuffle(indices)
state = state[indices]
latency = latency[indices]

In [6]:
passengers = pd.DataFrame({'ID':ID,
                           'age':age,
                           'home':home,
                           'floor':floor,
                           'x':x,
                           'y':y,
                           'behavior':behavior,
                           'is_moving':is_moving,
                           'time_left':time_left,
                           'state':state,
                           'latency':latency})
passengers.reset_index(drop=True, inplace=True)
passengers #승객은 아래와 같음

Unnamed: 0,ID,age,home,floor,x,y,behavior,is_moving,time_left,state,latency
0,1,84.0,234,2,167.5,14,객실,False,0,S,inf
1,2,70.0,248,2,42.5,16,객실,False,0,E,440640.0
2,3,77.0,277,2,187.5,16,객실,False,0,S,inf
3,4,75.0,405,4,22.5,14,객실,False,0,S,inf
4,5,66.0,441,4,17.5,16,객실,False,0,S,inf
...,...,...,...,...,...,...,...,...,...,...,...
95,96,72.0,211,2,52.5,14,객실,False,0,S,inf
96,97,86.0,408,4,37.5,14,객실,False,0,S,inf
97,98,78.0,446,4,42.5,16,객실,False,0,S,inf
98,99,68.0,265,2,127.5,16,객실,False,0,S,inf


### 최단경로 찾기

In [7]:
def get_linear_path(startRoom, endRoom): #같은 층에 있을 때 이동경로 찾아줌. 단 경로에 끝점은 포함되나 시작점은 포함되지 않음.
    path = []
    floor = startRoom.floor
    x = startRoom.x
    y = cruise.loc[(cruise.floor.values==startRoom.floor) & (cruise.desc.values=='계단')].y.squeeze()
    if startRoom.x < endRoom.x:
        while True:
            x += walking_speed * dt
            if x < endRoom.x:
                path.append((floor, x, y-0.5)) #우측통행 고려
                continue
            break
    elif startRoom.x > endRoom.x:
        while True:
            x -= walking_speed * dt
            if x > endRoom.x:
                path.append((floor, x, y+0.5))
                continue
            break        
    path.append((floor, endRoom.x,endRoom.y))
    return path

In [8]:
def navigate(startPoint, endPoint): #입력값은 두 방의 ID, 두 방 사이를 이동하는 경로를 리스트로 리턴해줌.
    startRoom = cruise.loc[cruise.ID.values==startPoint].squeeze()
    endRoom = cruise.loc[cruise.ID.values==endPoint].squeeze()
    path = [(startRoom.floor, startRoom.x, startRoom.y)]
    
    if startRoom.floor == endRoom.floor:
        path += get_linear_path(startRoom, endRoom)    
    else:
        floor = startRoom.floor
        path += get_linear_path(startRoom, cruise.loc[(cruise.floor.values==floor) & (cruise.desc.values=='계단')].squeeze())
        if startRoom.floor < endRoom.floor:
            while floor < endRoom.floor:
                floor += 1
                x, y = cruise.loc[(cruise.floor.values==floor) & (cruise.desc.values=='계단')].squeeze()[['x','y']]
                path += [(floor, x, y)]
        else:
            while floor > endRoom.floor:
                floor -= 1
                x, y = cruise.loc[(cruise.floor.values==floor) & (cruise.desc.values=='계단')].squeeze()[['x','y']]
                path += [(floor, x, y)]
                
        path += get_linear_path(cruise.loc[(cruise.floor.values==floor) & (cruise.desc.values=='계단')].squeeze(),
                                endRoom)

    return path

### 승객 이동패턴 정의하기(가장 많은 가정이 필요한 부분)
- 승객들이 방문할 수 있는 시설에는
1. 1층 기념품, 서비스, 게임센터, 노래방, ICT실, 고객센터, 볼링장, 당구장, 화장실
2. 2층 편의점, 카페(2층 투숙객들만 방문)
3. 3층 식당(1\~5), 카페(1\~4 -> 식사 후 일정 확률)
4. 4층 편의점, 카페(2층 투숙객들만 방문)
5. 5층 전망대, 사우나, 야외 공연장

### 승객 패턴을 정의하기 위한 가정들
1. 각각의 활동을 하는 데 걸리는 시간은 아래와 같다.
- 5층: 영화 2시간, 사우나 4시간, 전망대 30분
- 3층: 아침 30분, 점심 30분, 저녁 **1시간** 
- 2,4층: 카페 2시간, 편의점 15분, 
- 1층: 볼링 1시간, 당구 1시간, 노래방 90분, 카페 2시간, ICT실 2시간, 서비스센터 15분

## 오늘 만들어야 할 함수


### (1) 각 장소별 방문 확률 불러오기



In [9]:
xls = pd.ExcelFile(probability_excel_path)
probability = pd.read_excel(xls, sheet_name=0) #특정 시간대에 어떤 활동을 할 확률
frequency = pd.read_excel(xls, sheet_name=1).fillna(value=np.inf) #하루에 어떤 활동을 하는 최대 횟수
required_time = pd.read_excel(xls, sheet_name=2) #활동을 하는 데 걸리는 최소, 최대, 기준 시간

In [10]:
all_behaviors = probability.iloc[:,0].tolist()
all_behaviors_and_places = {behavior:behavior for behavior in all_behaviors if behavior not in ['아침', '점심', '저녁']} #활동과 장소 이름이 다른 경우 제외
all_behaviors_and_places.update({behavior:'식당' for behavior in ['아침', '점심', '저녁']}) #다시 포함

In [11]:
pending_path = {ID+1:[] for ID in range(number_of_passengers)}
done_behaviors = {ID+1:[0]*len(all_behaviors) for ID in range(number_of_passengers)}

### (2) 어디로 갈 지 확률적으로 결정하기
1. time_left가 0인 승객을 passengers.loc[] 써서 뽑아낸다.
2. 그 승객들 각각에 대해 아래 과정을 반복한다.
    * 지금이 몇 시인지 (전역변수 t//3600으로 구함)를 기준으로 각각의 장소로 갈 확률을 구한다.
    * 다음 장소를 선택하고, 그 장소에 최대 방문 횟수 체크해서
    * 적당히 반복하면 다음 장소를 고를 수 있다.
    * 다음 장소가 골라지면, 현재 장소와 다음 장소를 잇는 경로를 만들어서 pending_path\[ID]에 넣는다.
    * 승객의 time_left를 다음 장소의 활동 소요 시간으로 설정한다.
    * done_behaviors\[ID]\[behavior]에 1을 더한다.



In [12]:
def where_to_move(people=passengers):
    hour = round(t//3600)%24
    probs = probability.loc[:, hour].tolist() #해당 시간대 모든 활동의 확률 불러오기
    all_behaviors_and_probs = {behavior:probs[index] for index, behavior in enumerate(all_behaviors)}
    people_to_move = people.loc[people.time_left.values==0, 'ID']

    for ID in people_to_move:
        available_choices = [[index]*round(all_behaviors_and_probs[behavior]*10000) for index, behavior in enumerate(all_behaviors)]
        available_choices = reduce(lambda a,b: a+b, available_choices)

        while True:
            next_behavior_index = np.random.choice(available_choices) #1개 뽑기
            if done_behaviors[ID][next_behavior_index] < frequency.iloc[next_behavior_index, 1]: #해당 활동의 일일 최대횟수를 넘어가면 안 함
                break 

        done_behaviors[ID][next_behavior_index] += 1
        next_behavior = all_behaviors[next_behavior_index]
        
        next_time_min, next_time_max = required_time.iloc[next_behavior_index, [1,2]]
        next_time_left = round(np.random.uniform(low=next_time_min, high=next_time_max)*60/dt)*dt #모든 활동은 dt 단위     

        homeID, floor, x, y, now_behavior = people.loc[people.ID.values==ID, ['home', 'floor','x','y', 'behavior']].squeeze().tolist()
        now_roomID = cruise.loc[(cruise.floor.values==floor) & (cruise.x==x) & (cruise.y==y), 'ID'].squeeze()
        

        if now_behavior == next_behavior:
            will_move = 0
        else:
            will_move = 1
            if next_behavior == '객실':
                next_roomID = homeID 
            else:
                next_roomIDs = cruise.loc[cruise.desc.str.contains(all_behaviors_and_places[next_behavior])].ID.to_numpy()
                if next_behavior == '카페':
                    next_roomIDs = next_roomIDs[next_roomIDs//100!=( {2:4, 4:2}.get(homeID//100) )]
                if len(next_roomIDs)==0: print(next_behavior, homeID)
                next_roomID = np.random.choice(next_roomIDs)

            pending_path[ID] = navigate(now_roomID, next_roomID) 
        people.loc[people.ID.values==ID, ['behavior', 'is_moving', 'time_left']] = [next_behavior, will_move, next_time_left] 
    return

### (3) 각 승객 이동시키기
1. pending_path\[ID]가 비어있지 않은 승객을 뽑아낸다.
2. 그 각각의 승객에 대해 아래 과정을 수행한다.
    * next_floor, next_x, next_y = pending_path\[ID].pop(0)
    * floor, x, y를 위 세 개로 설정

In [13]:
def move(people=passengers): #이전 함수는 실행 시간이 느려, 효율적인 itertuples로 바꿈
    for person in people.loc[people.is_moving.values==True].itertuples():
        ID = person.ID
        next_floor, next_x, next_y = pending_path[ID].pop(0)
        people.loc[people.ID.values==ID, ['floor', 'x', 'y']] = next_floor, next_x, next_y
        if pending_path[ID] == []:
            people.loc[people.ID.values==ID, 'is_moving'] = False

### (4) 감염자 기준으로 거리 계산하기, 감염되는지 확인하기

In [14]:
infection_history = []

In [15]:
def hall_infection(people=passengers):
    all_susceptible = people.loc[(people.state.values=='S') & (people.is_moving.values==1), ['ID', 'floor', 'x', 'y']] #움직이는 감염 대상군(dataframe)
    all_infected = people.loc[(people.state.str.contains("Is|Ia")) & (people.is_moving.values==1), ['ID', 'floor', 'x', 'y', 'state']] #움직이는 감염자(dataframe)

    for infected in all_infected.itertuples(): #각각의 움직이는 감염자에 대해 반복
        nearby_susceptible = all_susceptible.loc[(all_susceptible.floor==infected.floor) & (np.linalg.norm((all_susceptible.x.values-infected.x, all_susceptible.y.values-infected.y), axis=0)<radius)]
        for ID in nearby_susceptible.ID:
            if np.random.rand() < transmissibility[infected.state]:
                print(f"{ID}번 사람이 감염되었습니다.")
                infection_history.append( (infected.ID,ID) )
                people.loc[people.ID.values==ID, ["state", "latency"]] = ["E", incubation_period]

## 시뮬레이션하기

In [16]:
stats = {'hour':[], 'S':[], 'E':[], 'Is':[], 'Ia':[], 'R':[]}

def change_state(people=passengers): #각 단계별로 지연 기간(잠복기, 치료 기간)이 끝난 승객들을 다음 상태로 바꿈
    #E -> Ia or Is
    exposed_to_infected = people.loc[(people.state.values=='E') & (people.latency.values==0), 'ID']
    for ID in exposed_to_infected:
        if np.random.rand() <= p_asymptomatic: #np.random.rand: [0,1)의 난수 생성
            people.loc[people.ID.values==ID, 'state'] = 'Ia'
        else:
            people.loc[people.ID.values==ID, 'state'] = 'Is'
        people.loc[people.ID.values==ID, 'latency'] = infection_period
    
    # Ia or Is -> R
    infected_to_recovered = people.loc[(people.state.str.contains('Is|Ia')) & (people.latency.values==0), 'ID']
    people.loc[people.ID.isin(infected_to_recovered), 'state'] = 'R'
    people.loc[people.ID.isin(infected_to_recovered), 'latency'] = np.inf
    
def append_result(t, stats=stats): #매 시간마다 결과값을 리스트에 넣는 함수
    counts = passengers.state.value_counts()
    stats['hour'].append(t//3600)
    for state in ['S','E','Is','Ia','R']:
        stats[state].append(counts.get(state, default=0))



In [None]:
t = 0 #초기 시각
t_max = 86400*30 #최대 시뮬레이션 시간(일단 30일)

while t <= t_max:
    #사람 상태 갱신
    change_state()
    
    #사람 이동
    where_to_move()
    move()

    #접촉감염 여부 검사
    hall_infection()

    #시간별로 결과 저장
    if t%3600==0: append_result(t)
    if t%3600==0:
        S, E, Is, Ia, R = [stats[state][-1] for state in ['S','E','Is','Ia','R']]
        print(f"Hour {t//3600} ended; S({S}), E({E}), Is({Is}), Ia({Ia}), R({R})")
    
    #시간 변화
    t += dt
    passengers.loc[:, ['time_left','latency']] -= dt
    if t%86400==0: done_behaviors = {ID+1:[0]*len(all_behaviors) for ID in range(number_of_passengers)}
    
stats = pd.DataFrame(stats)    

## 감염자 수 변화 시각화

In [None]:
#SEIR
plt.rc('font', family='NanumGothic') # For Windows
plt.style.use('fivethirtyeight')

plt.plot(stats.hour/24, stats.S/number_of_passengers, c='blue') #감염대상군은 파란색
plt.plot(stats.hour/24, stats.E/number_of_passengers, c='yellow') #접촉군은 노란색
plt.plot(stats.hour/24, (stats.Ia+stats.Is)/number_of_passengers, c='red') #감염군은 빨간색(무증상, 유증상 합친 수치)
plt.plot(stats.hour/24, stats.R/number_of_passengers, c='green') #회복군은 초록색
plt.legend(['S','E','I','R']) #범례 표시
plt.xlabel('시뮬레이션 시간(일)')
plt.ylabel('비율', rotation=0)
plt.ylim(0,1)
plt.show()

In [None]:
#SEI2R

plt.plot(stats.hour/24, stats.S/number_of_passengers, c='blue') #감염대상군은 파란색
plt.plot(stats.hour/24, stats.E/number_of_passengers, c='yellow') #접촉군은 노란색
plt.plot(stats.hour/24, stats.Is/number_of_passengers, c='orange')
plt.plot(stats.hour/24, stats.Ia/number_of_passengers, c='red') 
plt.plot(stats.hour/24, stats.R/number_of_passengers, c='green') #회복군은 초록색
plt.legend(['S','E','Is','Ia','R']) #범례 표시
plt.xlabel('시뮬레이션 시간(일)')
plt.ylabel('비율', rotation=0)
plt.ylim(0,1)
plt.show()