In [None]:
# output 디렉토리 초기화 - 없으면 생성, 있으면 내부 jpg 파일 삭제

import os

if os.path.exists("./output"):
    listdir = os.listdir("./output")
    for f in listdir:
        if f.endswith(".jpg"):
            os.remove(f"./output/{f}")
else:
    os.makedirs("./output")

if os.path.exists("./output.zip"):
    os.remove("./output.zip")


In [None]:
# 카메라 입력 받아온 후 녹화

try:
    import jchm
except ModuleNotFoundError:
    pass
import cv2
import os
import time
from contextlib import suppress
from enum import IntEnum
from typing import Callable

class CameraMode(IntEnum):
    JAJUCHA = 0 # 자주차
    COMPUTER = 1 # 컴퓨터

class Camera:
    def __init__(self, mode: CameraMode, device):
        if not isinstance(mode, CameraMode):
            raise TypeError(f"mode는 CameraMode여야 합니다. got {type(mode).__name__}")
        self.mode = mode

        try:
            if mode is CameraMode.JAJUCHA: # 자주차 카메라
                try:
                    self.camera = jchm.camera
                except NameError as e: # jchm가 정의되지 않았다면 (임포트되지 않았다면) NameError
                    raise ImportError("jchm 모듈을 사용할 수 없습니다.") from e
                self.set_camera_device(device)
                self.getFrame: Callable[[], "any"] = self.__getJajuchaFrame
                self.showFrame: Callable[["any"], None] = self.__showFrameOnJajucha

            elif mode is CameraMode.COMPUTER:
                self.set_camera_device(device)
                if not self.camera.isOpened():
                    raise RuntimeError(f"카메라를 인식할 수 없습니다. (index {self.device})")
                self.getFrame = self.__getComputerFrame
                self.showFrame = self.__showFrameOnComputer

            else:
                raise ValueError(f"사용불가한 mode: {mode}")

        except Exception as e: # init 전반의 예외를 포장
            raise RuntimeError("카메라 초기화 도중 예기치 않은 오류가 발생했습니다.") from e

    # --- public helpers ---

    def set_camera_device(self, new_device) -> None:
        """COMPUTER 모드에서 카메라 인덱스를 런타임 도중에 변경."""
        if self.mode is CameraMode.JAJUCHA:
            if type(new_device) is not str:
                raise TypeError("자주차 카메라의 디바이스 이름은 문자열이여야 합니다.")
            self.camera = jchm.camera
        elif self.mode is CameraMode.COMPUTER:
            if type(new_device) is not int:
                raise TypeError("컴퓨터 카메라의 디바이스 이름은 정수여야 합니다.")
            if hasattr(self, "camera") and self.camera is not None:
                self.release()
            self.device = new_device
            self.camera = cv2.VideoCapture(new_device)

    def release(self) -> None:
        """리소스 정리."""
        if self.mode is CameraMode.COMPUTER and hasattr(self, "camera") and self.camera is not None:
            self.camera.release()
            # 창 사용했다면 닫기
            try:
                cv2.destroyAllWindows()
            except Exception:
                pass
    
    def saveFrame(self, mat, output_dir: str, filename: str, jpg_compress_quality: int):
        if not os.path.exists(output_dir):
            raise NotADirectoryError("output 디렉토리를 찾지 못했습니다.")
        cv2.imwrite(f"{output_dir}/{filename}", mat, [cv2.IMWRITE_JPEG_QUALITY, jpg_compress_quality])

    # --- JAJUCHA backend ---

    def __getJajuchaFrame(self):
        return self.camera.get_image(self.device)

    def __showFrameOnJajucha(self, mat):
        self.camera.show_image(mat, "center")

    # --- COMPUTER backend ---

    def __getComputerFrame(self):
        ret, frame = self.camera.read()
        if not ret:
            raise RuntimeError(f"카메라를 인식할 수 없습니다. (index {self.device})")
        return frame

    def __showFrameOnComputer(self, mat):
        cv2.imshow("center", mat)

def main():
    OUTPUT_DIR = "./output"
    FPS = 1
    DT = 1.0 / FPS
    LOCAL_CAMERA_INDEX = 0
    JPG_COMPRESS_QUALITY = 80

    camera = Camera(
        mode=CameraMode.COMPUTER,
        device=LOCAL_CAMERA_INDEX
    )

    with suppress(KeyboardInterrupt):
        cnt = 0
        next_frame = time.perf_counter()
        while True:
            next_frame += DT
            now = time.perf_counter()
            if now < next_frame:
                time.sleep(next_frame - now)
            else:
                missed = int((now - next_frame) // DT) + 1
                next_frame += missed * DT
            cnt += 1
            img = camera.getFrame()
            camera.showFrame(img)
            camera.saveFrame(img, OUTPUT_DIR, f"{cnt}.jpg", JPG_COMPRESS_QUALITY)

if __name__ == "__main__":
    main()



In [None]:
# output 디렉토리 압축

import shutil

!cd output && find . -exec touch -t 200001010000 {} \;

shutil.make_archive("./output", "zip", "./output/")


In [None]:
#!/usr/bin/env python3
# 파일 HTTP 정적 서빙 + 업로드(POST /upload, PUT /<filename>)

from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from functools import partial
import os, re, shutil, time

PORT = 8000
BIND = "0.0.0.0"
DIR = os.getcwd()
MAX_UPLOAD = 1 * 1024 * 1024 * 1024  # 1GB

def safe_name(name: str) -> str:
  name = os.path.basename(name)
  name = re.sub(r'[^A-Za-z0-9._-]+', '_', name)
  return name or f'upload_{int(time.time())}'

def unique_path(root: str, name: str) -> str:
  path = os.path.join(root, name)
  if not os.path.exists(path):
    return path
  stem, dot, ext = name.partition('.')
  i = 1
  while True:
    cand = f"{stem}_{i}{dot}{ext}" if dot else f"{stem}_{i}"
    path = os.path.join(root, cand)
    if not os.path.exists(path):
      return path
    i += 1

def _parse_content_disposition(v: str) -> dict:
  # 예: 'form-data; name="files"; filename="a.jpg"'
  out = {}
  parts = [p.strip() for p in v.split(';')]
  if parts:
    out['type'] = parts[0].lower()
  for p in parts[1:]:
    if '=' in p:
      k, val = p.split('=', 1)
      val = val.strip().strip('"')
      out[k.strip().lower()] = val
  return out

class UploadHandler(SimpleHTTPRequestHandler):
  # 업로드 폼 페이지
  def do_GET(self):
    if self.path.rstrip('/') == '/upload':
      self.send_response(200)
      self.send_header('Content-Type', 'text/html; charset=utf-8')
      self.end_headers()
      self.wfile.write(b"""<!doctype html><meta charset="utf-8">
<title>Upload</title>
<h3>PUT</h3>
<input id=f type=file multiple>
<button id=btn>Upload via PUT</button>
<script>
document.getElementById('btn').onclick = async () => {
  const fs = document.getElementById('f').files;
  for (const f of fs) {
    const res = await fetch('/' + encodeURIComponent(f.name), {method:'PUT', body:f});
    console.log(f.name, await res.text());
  }
  alert('PUT upload done');
};
</script>
""")
      return
    return super().do_GET()

  # multipart/form-data 직접 파싱
  def do_POST(self):
    if self.path.rstrip('/') != '/upload':
      self.send_error(404, "POST only supported on /upload")
      return

    ctype = self.headers.get('Content-Type', '')
    if not ctype.lower().startswith('multipart/form-data'):
      self.send_error(400, "Content-Type must be multipart/form-data")
      return

    # boundary 추출
    boundary = None
    for part in ctype.split(';'):
      part = part.strip()
      if part.lower().startswith('boundary='):
        boundary = part.split('=', 1)[1]
        if boundary.startswith('"') and boundary.endswith('"'):
          boundary = boundary[1:-1]
        break
    if not boundary:
      self.send_error(400, "Missing multipart boundary")
      return
    boundary = boundary.encode('ascii', 'ignore')

    # Body 읽기 (브라우저는 보통 Content-Length를 보냄)
    length = self.headers.get('Content-Length')
    if not length:
      self.send_error(411, "Content-Length required")
      return
    length = int(length)
    if length > MAX_UPLOAD:
      self.send_error(413, f"Upload too large (> {MAX_UPLOAD} bytes)")
      return
    body = self.rfile.read(length)

    # 파트 분리
    delim = b'--' + boundary
    end_delim = b'--' + boundary + b'--'
    if end_delim not in body:
      # 일부 클라이언트는 마지막에 CRLF를 붙일 수 있음
      pass

    # 첫 세그먼트는 프리앰블(대개 빈 값), 마지막은 클로저
    segments = body.split(delim)
    saved = []

    for seg in segments[1:]:
      if seg.startswith(b'--'):
        # 종료
        break
      # 각 세그먼트는 '\r\n'로 시작함
      if seg.startswith(b'\r\n'):
        seg = seg[2:]

      header_bytes, _, content = seg.partition(b'\r\n\r\n')
      if not _:
        continue  # 헤더/본문 구분 실패

      # 각 파트는 끝에 '\r\n' 후 다음 boundary가 온다 → 트레일링 CRLF 제거
      if content.endswith(b'\r\n'):
        content = content[:-2]

      # 헤더 파싱
      headers = {}
      for line in header_bytes.split(b'\r\n'):
        try:
          k, v = line.decode('latin-1').split(':', 1)
          headers[k.strip().lower()] = v.strip()
        except ValueError:
          continue

      dispo = _parse_content_disposition(headers.get('content-disposition', ''))
      filename = dispo.get('filename')
      if not filename:
        # 파일 필드가 아닌 일반 폼 데이터 → 스킵
        continue

      fname = safe_name(filename)
      path = unique_path(self.directory, fname)
      # 저장
      with open(path, 'wb') as f:
        f.write(content)

      size = os.path.getsize(path)
      if size > MAX_UPLOAD:
        self.send_error(413, f"File too large (> {MAX_UPLOAD} bytes)")
        try:
          os.remove(path)
        except Exception:
          pass
        return

      saved.append((os.path.basename(path), size))

    # 응답
    self.send_response(303)
    self.send_header('Location', '/')
    self.send_header('Content-Type', 'text/html; charset=utf-8')
    self.end_headers()
    msg = "<br>".join(f"Saved {name} ({size} bytes)" for name, size in saved) or "No files uploaded"
    self.wfile.write(f"<!doctype html><meta charset='utf-8'><p>{msg}</p><a href='/'>Go to index</a>".encode('utf-8'))

  # PUT 업로드 (curl/JS에서 사용)
  def do_PUT(self):
    length = self.headers.get('Content-Length')
    if length and int(length) > MAX_UPLOAD:
      self.send_error(413, f"Upload too large (> {MAX_UPLOAD} bytes)")
      return

    name = safe_name(os.path.basename(self.path))
    if not name:
      self.send_error(400, "Invalid filename")
      return
    path = unique_path(self.directory, name)

    remaining = int(length) if length else None
    with open(path, 'wb') as f:
      if remaining is None:
        shutil.copyfileobj(self.rfile, f)
      else:
        buf = 64 * 1024
        while remaining > 0:
          chunk = self.rfile.read(min(buf, remaining))
          if not chunk:
            break
          f.write(chunk)
          remaining -= len(chunk)

    self.send_response(201, "Created")
    self.send_header('Content-Type', 'text/plain; charset=utf-8')
    self.end_headers()
    self.wfile.write(f"Saved to {os.path.basename(path)}\n".encode('utf-8'))

def main():
  handler_cls = partial(UploadHandler, directory=DIR)
  with ThreadingHTTPServer((BIND, PORT), handler_cls) as httpd:
    root = os.path.abspath(DIR)
    print(f"Serving {root} at http://{BIND}:{PORT}  (Ctrl+C to quit)")
    try:
      httpd.serve_forever()
    except KeyboardInterrupt:
      pass

if __name__ == "__main__":
  main()
